Skip to content

Commit 280cb28

Browse files
committed
Added multi-shard search
1 parent d6d05cb commit 280cb28

File tree

9 files changed

+1744
-0
lines changed

9 files changed

+1744
-0
lines changed

db/db.go

+9
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,15 @@ type Recommendation struct {
339339
RecommendedVal int64
340340
}
341341

342+
// BadInputError Special error indicating bad user input as opposed to a database error
343+
type BadInputError struct {
344+
Details error
345+
}
346+
347+
func (bi BadInputError) Error() string {
348+
return bi.Details.Error()
349+
}
350+
342351
// DBType - database type
343352
type DBType struct {
344353
Driver DialectName // driver name (used in the code)

db/helpers.go

+4
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,10 @@ func ParseScheme(s string) (scheme string, uri string, err error) {
277277
return parts[0], parts[1], nil
278278
}
279279

280+
func FormatTimeStamp(timestamp time.Time) string {
281+
return fmt.Sprintf("%vns", timestamp.UTC().UnixNano())
282+
}
283+
280284
// Cond represents a condition
281285
type Cond struct {
282286
Col string

db/search/compare.go

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package search
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/acronis/perfkit/db"
7+
)
8+
9+
type comparator[T Searchable[T]] func(a, b T) bool
10+
type comparators[T Searchable[T]] map[string]map[string]comparator[T]
11+
12+
func makeComparator[T Searchable[T]](values []string, comparable comparators[T]) (comparator[T], error) {
13+
var less func(a, b T) bool
14+
if len(values) == 0 {
15+
return less, nil
16+
}
17+
18+
var finalLess func(a, b T) bool
19+
20+
for i := len(values) - 1; i >= 0; i-- {
21+
value := values[i]
22+
23+
fnc, field, err := db.ParseFunc(value)
24+
if err != nil {
25+
return nil, err
26+
}
27+
28+
if fnc == "" {
29+
return nil, fmt.Errorf("empty order function")
30+
}
31+
32+
if field == "" {
33+
return nil, fmt.Errorf("empty order field")
34+
}
35+
36+
fieldComparators, ok := comparable[field]
37+
if !ok {
38+
return nil, fmt.Errorf("bad order field '%v'", field)
39+
}
40+
41+
less, ok := fieldComparators[fnc]
42+
if !ok {
43+
return nil, fmt.Errorf("bad order function '%v'", fnc)
44+
}
45+
46+
if finalLess == nil {
47+
finalLess = less
48+
} else {
49+
var deepLess = finalLess
50+
51+
finalLess = func(a, b T) bool {
52+
if less(a, b) {
53+
return true
54+
} else if less(b, a) {
55+
return false
56+
}
57+
58+
return deepLess(a, b)
59+
}
60+
}
61+
}
62+
63+
return finalLess, nil
64+
}

db/search/cursor.go

+247
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
package search
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/acronis/perfkit/db"
7+
)
8+
9+
// CursorIterable is an interface for entities that can be iterated by cursor
10+
type CursorIterable[T any] interface {
11+
Unique(field string) bool
12+
Nullable(field string) bool
13+
Cursor(field string) (string, error)
14+
}
15+
16+
type sorting struct {
17+
Field string
18+
Func string
19+
}
20+
21+
func uniqueSort[T CursorIterable[T]](encodedSorts []string, cursors map[string]string, instance T) ([]string, []sorting, error) {
22+
var hasUniqueSorting = false
23+
var uniqueOrderDirection int
24+
25+
var encoded []string
26+
var sorts []sorting
27+
28+
for _, v := range encodedSorts {
29+
var fnc, field, err = db.ParseFunc(v)
30+
if err != nil {
31+
return nil, nil, err
32+
}
33+
34+
var unique = instance.Unique(field)
35+
var nullable = instance.Nullable(field)
36+
hasUniqueSorting = unique && !nullable
37+
38+
encoded = append(encoded, v)
39+
sorts = append(sorts, sorting{
40+
Field: field,
41+
Func: fnc,
42+
})
43+
44+
switch fnc {
45+
case "asc":
46+
uniqueOrderDirection++
47+
case "desc":
48+
uniqueOrderDirection--
49+
}
50+
51+
if unique {
52+
if !nullable {
53+
break
54+
} else if cursors != nil {
55+
if val, ok := cursors[field]; ok && val != db.SpecialConditionIsNull {
56+
if fnc != "desc" {
57+
break
58+
}
59+
}
60+
}
61+
}
62+
}
63+
64+
if !hasUniqueSorting {
65+
if uniqueOrderDirection >= 0 {
66+
encoded = append(encoded, "asc(id)")
67+
sorts = append(sorts, sorting{Field: "id", Func: "asc"})
68+
} else {
69+
encoded = append(encoded, "desc(id)")
70+
sorts = append(sorts, sorting{Field: "id", Func: "desc"})
71+
}
72+
}
73+
74+
return encoded, sorts, nil
75+
}
76+
77+
func orderCondition(val, fnc string) (expr string, flag bool, err error) {
78+
var direction string
79+
switch fnc {
80+
case "asc":
81+
switch val {
82+
case db.SpecialConditionIsNull:
83+
return db.SpecialConditionIsNotNull, false, nil
84+
case db.SpecialConditionIsNotNull:
85+
return "", true, nil
86+
default:
87+
direction = "gt"
88+
}
89+
case "desc":
90+
switch val {
91+
case db.SpecialConditionIsNotNull:
92+
return db.SpecialConditionIsNull, false, nil
93+
case db.SpecialConditionIsNull:
94+
return "", true, nil
95+
default:
96+
direction = "lt"
97+
}
98+
default:
99+
return "", false, fmt.Errorf("missing ordering for cursor")
100+
}
101+
102+
return fmt.Sprintf("%s(%v)", direction, val), false, nil
103+
}
104+
105+
func splitQueryOnLightWeightQueries[T CursorIterable[T]](pt PageToken, instance T) ([]PageToken, error) {
106+
var tokens []PageToken
107+
108+
if len(pt.Fields) == 0 {
109+
tokens = append(tokens, pt)
110+
return tokens, nil
111+
}
112+
113+
// check for unique sorting
114+
var encodedSorts, sorts, err = uniqueSort(pt.Order, pt.Cursor, instance)
115+
if err != nil {
116+
return nil, err
117+
}
118+
119+
if len(pt.Cursor) == 0 {
120+
pt.Order = encodedSorts
121+
tokens = append(tokens, pt)
122+
return tokens, nil
123+
}
124+
125+
// construct sort map for fast access
126+
var orderFunctions = map[string]string{}
127+
for _, sort := range sorts {
128+
orderFunctions[sort.Field] = sort.Func
129+
}
130+
131+
// add condition based on cursor
132+
var whereFromCursor = func(fld, val string, pt *PageToken) (bool, error) {
133+
var filter, empty, filterErr = orderCondition(val, orderFunctions[fld])
134+
if filterErr != nil {
135+
return false, filterErr
136+
}
137+
138+
if empty {
139+
return true, nil
140+
}
141+
142+
pt.Filter[fld] = append(pt.Filter[fld], filter)
143+
return false, nil
144+
}
145+
146+
for cursor := range pt.Cursor {
147+
if _, ok := orderFunctions[cursor]; !ok {
148+
return nil, fmt.Errorf("prohibited cursor, not mentioned it order: %v", cursor)
149+
}
150+
}
151+
152+
// split to x page tokens
153+
for i := range sorts {
154+
var cpt = pt
155+
var last = len(sorts) - 1 - i
156+
157+
// copy filters
158+
cpt.Filter = make(map[string][]string, len(sorts)-1-i)
159+
for k, v := range pt.Filter {
160+
cpt.Filter[k] = v
161+
}
162+
163+
// add equal condition on all fields except last in sorts
164+
for j := 0; j <= last-1; j++ {
165+
var fld = sorts[j].Field
166+
var val = pt.Cursor[fld]
167+
168+
cpt.Filter[fld] = append(cpt.Filter[fld], val)
169+
}
170+
171+
// add gt / lt condition for last sorting
172+
var empty bool
173+
if val, ok := cpt.Cursor[sorts[last].Field]; ok {
174+
if empty, err = whereFromCursor(sorts[last].Field, val, &cpt); err != nil {
175+
return nil, err
176+
}
177+
} else {
178+
continue
179+
}
180+
181+
if empty {
182+
continue
183+
}
184+
185+
// Add only needed sort to cpt
186+
cpt.Order = []string{}
187+
for j := last; j <= len(sorts)-1; j++ {
188+
cpt.Order = append(cpt.Order, encodedSorts[j])
189+
190+
var sortField = sorts[j].Field
191+
192+
if instance.Unique(sortField) {
193+
if !instance.Nullable(sortField) {
194+
break
195+
}
196+
197+
var becomeUnique = false
198+
// for ASC if we have a value, that means we already select all null rows
199+
// for DESC Nulls can start at any row
200+
if sorts[j].Func == "asc" {
201+
for _, val := range cpt.Filter[sortField] {
202+
if val != db.SpecialConditionIsNull {
203+
becomeUnique = true
204+
break
205+
}
206+
}
207+
}
208+
if becomeUnique {
209+
break
210+
}
211+
}
212+
}
213+
214+
cpt.Cursor = nil
215+
216+
tokens = append(tokens, cpt)
217+
}
218+
219+
return tokens, nil
220+
}
221+
222+
func createNextCursorBasedPageToken[T CursorIterable[T]](previousPageToken PageToken, items []T, limit int64, instance T) (*PageToken, error) {
223+
if int64(len(items)) < limit {
224+
return nil, nil
225+
}
226+
227+
var pt PageToken
228+
pt.Cursor = make(map[string]string)
229+
pt.Fields = previousPageToken.Fields
230+
231+
var encoded, sorts, err = uniqueSort(previousPageToken.Order, previousPageToken.Cursor, instance)
232+
if err != nil {
233+
return nil, err
234+
}
235+
pt.Order = encoded
236+
237+
var last = items[len(items)-1]
238+
for _, sort := range sorts {
239+
var value string
240+
if value, err = last.Cursor(sort.Field); err != nil {
241+
return nil, err
242+
}
243+
pt.Cursor[sort.Field] = value
244+
}
245+
246+
return &pt, nil
247+
}

0 commit comments

Comments
 (0)