diff --git a/CHANGELOG.md b/CHANGELOG.md index e6fdc6a99d5..dc91b48431e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ 1. [16523](https://github.com/influxdata/influxdb/pull/16523): Change influx packages to be CRD compliant 1. [16547](https://github.com/influxdata/influxdb/pull/16547): Allow trailing newline in credentials file and CLI integration +1. [16545](https://github.com/influxdata/influxdb/pull/16545): Add support for prefixed cursor search to ForwardCursor types ### UI Improvements diff --git a/bolt/kv.go b/bolt/kv.go index fbc4b519ade..2961dd931b4 100644 --- a/bolt/kv.go +++ b/bolt/kv.go @@ -1,6 +1,7 @@ package bolt import ( + "bytes" "context" "fmt" "os" @@ -200,13 +201,18 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward var ( cursor = b.bucket.Cursor() key, value = cursor.Seek(seek) + config = kv.NewCursorConfig(opts...) ) + if config.Prefix != nil && !bytes.HasPrefix(seek, config.Prefix) { + return nil, fmt.Errorf("seek bytes %q not prefixed with %q: %w", string(seek), string(config.Prefix), kv.ErrSeekMissingPrefix) + } + return &Cursor{ cursor: cursor, key: key, value: value, - config: kv.NewCursorConfig(opts...), + config: config, }, nil } @@ -275,7 +281,7 @@ func (c *Cursor) Last() ([]byte, []byte) { // Next retrieves the next key in the bucket. func (c *Cursor) Next() (k []byte, v []byte) { - if c.closed { + if c.closed || (c.key != nil && c.missingPrefix(c.key)) { return nil, nil } // get and unset previously seeked values if they exist @@ -290,7 +296,7 @@ func (c *Cursor) Next() (k []byte, v []byte) { } k, v = next() - if len(k) == 0 && len(v) == 0 { + if (len(k) == 0 && len(v) == 0) || c.missingPrefix(k) { return nil, nil } return k, v @@ -298,7 +304,7 @@ func (c *Cursor) Next() (k []byte, v []byte) { // Prev retrieves the previous key in the bucket. func (c *Cursor) Prev() (k []byte, v []byte) { - if c.closed { + if c.closed || (c.key != nil && c.missingPrefix(c.key)) { return nil, nil } // get and unset previously seeked values if they exist @@ -313,12 +319,16 @@ func (c *Cursor) Prev() (k []byte, v []byte) { } k, v = prev() - if len(k) == 0 && len(v) == 0 { + if (len(k) == 0 && len(v) == 0) || c.missingPrefix(k) { return nil, nil } return k, v } +func (c *Cursor) missingPrefix(key []byte) bool { + return c.config.Prefix != nil && !bytes.HasPrefix(key, c.config.Prefix) +} + // Err always returns nil as nothing can go wrong™ during iteration func (c *Cursor) Err() error { return nil diff --git a/inmem/kv.go b/inmem/kv.go index 17ead78c337..f2866401a30 100644 --- a/inmem/kv.go +++ b/inmem/kv.go @@ -240,6 +240,11 @@ type pair struct { // ForwardCursor returns a directional cursor which starts at the provided seeked key func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) { + config := kv.NewCursorConfig(opts...) + if config.Prefix != nil && !bytes.HasPrefix(seek, config.Prefix) { + return nil, fmt.Errorf("seek bytes %q not prefixed with %q: %w", string(seek), string(config.Prefix), kv.ErrSeekMissingPrefix) + } + var ( pairs = make(chan []pair) stop = make(chan struct{}) @@ -262,20 +267,15 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward var ( batch []pair - config = kv.NewCursorConfig(opts...) fn = config.Hints.PredicateFn - iterate = func(it btree.ItemIterator) { - b.btree.AscendGreaterOrEqual(&item{key: seek}, it) - } + iterate = b.ascend ) if config.Direction == kv.CursorDescending { - iterate = func(it btree.ItemIterator) { - b.btree.DescendLessOrEqual(&item{key: seek}, it) - } + iterate = b.descend } - iterate(func(i btree.Item) bool { + iterate(seek, config, func(i btree.Item) bool { select { case <-stop: // if signalled to stop then exit iteration @@ -290,6 +290,10 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward return false } + if config.Prefix != nil && !bytes.HasPrefix(j.key, config.Prefix) { + return false + } + if fn == nil || fn(j.key, j.value) { batch = append(batch, pair{Pair: kv.Pair{Key: j.key, Value: j.value}}) } @@ -317,6 +321,14 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward return &ForwardCursor{pairs: pairs, stop: stop}, nil } +func (b *Bucket) ascend(seek []byte, config kv.CursorConfig, it btree.ItemIterator) { + b.btree.AscendGreaterOrEqual(&item{key: seek}, it) +} + +func (b *Bucket) descend(seek []byte, config kv.CursorConfig, it btree.ItemIterator) { + b.btree.DescendLessOrEqual(&item{key: seek}, it) +} + // ForwardCursor is a kv.ForwardCursor which iterates over an in-memory btree type ForwardCursor struct { pairs <-chan []pair diff --git a/kv/store.go b/kv/store.go index 24504565a3a..fa889a6e032 100644 --- a/kv/store.go +++ b/kv/store.go @@ -11,6 +11,9 @@ var ( // ErrTxNotWritable is the error returned when an mutable operation is called during // a non-writable transaction. ErrTxNotWritable = errors.New("transaction is not writable") + // ErrSeekMissingPrefix is returned when seek bytes is missing the prefix defined via + // WithCursorPrefix + ErrSeekMissingPrefix = errors.New("seek missing prefix bytes") ) // IsNotFound returns a boolean indicating whether the error is known to report that a key or was not found. @@ -141,6 +144,7 @@ const ( type CursorConfig struct { Direction CursorDirection Hints CursorHints + Prefix []byte } // NewCursorConfig constructs and configures a CursorConfig used to configure @@ -171,3 +175,15 @@ func WithCursorHints(hints ...CursorHint) CursorOption { } } } + +// WithCursorPrefix configures the forward cursor to retrieve keys +// with a particular prefix. This implies the cursor will start and end +// at a specific location based on the prefix [prefix, prefix + 1). +// +// The value of the seek bytes must be prefixed with the provided +// prefix, otherwise an error will be returned. +func WithCursorPrefix(prefix []byte) CursorOption { + return func(c *CursorConfig) { + c.Prefix = prefix + } +} diff --git a/testing/kv.go b/testing/kv.go index 8a24203837d..704dde7fd70 100644 --- a/testing/kv.go +++ b/testing/kv.go @@ -3,6 +3,7 @@ package testing import ( "bytes" "context" + "errors" "fmt" "testing" "time" @@ -682,10 +683,9 @@ func KVForwardCursor( t *testing.T, ) { type args struct { - seek string - direction kv.CursorDirection - until string - hints []kv.CursorHint + seek string + until string + opts []kv.CursorOption } pairs := func(keys ...string) []kv.Pair { @@ -719,6 +719,42 @@ func KVForwardCursor( }, exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"}, }, + { + name: "prefix - no hints", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa/00", + until: "bbb/02", + opts: []kv.CursorOption{ + kv.WithCursorPrefix([]byte("aaa")), + }, + }, + exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"}, + }, + { + name: "prefix - does not prefix seek", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa/00", + until: "bbb/02", + opts: []kv.CursorOption{ + kv.WithCursorPrefix([]byte("aab")), + }, + }, + expErr: kv.ErrSeekMissingPrefix, + }, { name: "prefix hint", fields: KVStoreFields{ @@ -731,7 +767,9 @@ func KVForwardCursor( args: args{ seek: "aaa", until: "aaa/03", - hints: []kv.CursorHint{kv.WithCursorHintPrefix("aaa/")}, + opts: []kv.CursorOption{ + kv.WithCursorHints(kv.WithCursorHintPrefix("aaa/")), + }, }, exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"}, }, @@ -747,7 +785,9 @@ func KVForwardCursor( args: args{ seek: "aaa", until: "bbb/00", - hints: []kv.CursorHint{kv.WithCursorHintKeyStart("aaa/")}, + opts: []kv.CursorOption{ + kv.WithCursorHints(kv.WithCursorHintKeyStart("aaa/")), + }, }, exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"}, }, @@ -763,10 +803,11 @@ func KVForwardCursor( args: args{ seek: "aaa", until: "aaa/03", - hints: []kv.CursorHint{ - kv.WithCursorHintPredicate(func(key, _ []byte) bool { + opts: []kv.CursorOption{ + kv.WithCursorHints(kv.WithCursorHintPredicate(func(key, _ []byte) bool { return len(key) < 3 || string(key[:3]) == "aaa" - })}, + })), + }, }, exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"}, }, @@ -782,10 +823,11 @@ func KVForwardCursor( args: args{ seek: "", until: "aa/01", - hints: []kv.CursorHint{ - kv.WithCursorHintPredicate(func(_, val []byte) bool { + opts: []kv.CursorOption{ + kv.WithCursorHints(kv.WithCursorHintPredicate(func(_, val []byte) bool { return len(val) < 7 || string(val[:7]) == "val:aa/" - })}, + })), + }, }, exp: []string{"aa/00", "aa/01"}, }, @@ -799,12 +841,31 @@ func KVForwardCursor( "bbb/00", "bbb/01", "bbb/02"), }, args: args{ - seek: "bbb/00", - until: "aaa/00", - direction: kv.CursorDescending, + seek: "bbb/00", + until: "aaa/00", + opts: []kv.CursorOption{kv.WithCursorDirection(kv.CursorDescending)}, }, exp: []string{"bbb/00", "aaa/03", "aaa/02", "aaa/01", "aaa/00"}, }, + { + name: "prefixed - no hints - descending", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa/02", + until: "aa/", + opts: []kv.CursorOption{ + kv.WithCursorPrefix([]byte("aaa/")), + kv.WithCursorDirection(kv.CursorDescending), + }, + }, + exp: []string{"aaa/02", "aaa/01", "aaa/00"}, + }, { name: "start hint - descending", fields: KVStoreFields{ @@ -815,10 +876,12 @@ func KVForwardCursor( "bbb/00", "bbb/01", "bbb/02"), }, args: args{ - seek: "bbb/00", - until: "aaa/00", - direction: kv.CursorDescending, - hints: []kv.CursorHint{kv.WithCursorHintKeyStart("aaa/")}, + seek: "bbb/00", + until: "aaa/00", + opts: []kv.CursorOption{ + kv.WithCursorDirection(kv.CursorDescending), + kv.WithCursorHints(kv.WithCursorHintKeyStart("aaa/")), + }, }, exp: []string{"bbb/00", "aaa/03", "aaa/02", "aaa/01", "aaa/00"}, }, @@ -832,13 +895,14 @@ func KVForwardCursor( "bbb/00", "bbb/01", "bbb/02"), }, args: args{ - seek: "aaa/03", - until: "aaa/00", - direction: kv.CursorDescending, - hints: []kv.CursorHint{ - kv.WithCursorHintPredicate(func(key, _ []byte) bool { + seek: "aaa/03", + until: "aaa/00", + opts: []kv.CursorOption{ + kv.WithCursorDirection(kv.CursorDescending), + kv.WithCursorHints(kv.WithCursorHintPredicate(func(key, _ []byte) bool { return len(key) < 3 || string(key[:3]) == "aaa" - })}, + })), + }, }, exp: []string{"aaa/03", "aaa/02", "aaa/01", "aaa/00"}, }, @@ -852,13 +916,14 @@ func KVForwardCursor( "bbb/00", "bbb/01", "bbb/02"), }, args: args{ - seek: "aa/01", - until: "aa/00", - direction: kv.CursorDescending, - hints: []kv.CursorHint{ - kv.WithCursorHintPredicate(func(_, val []byte) bool { + seek: "aa/01", + until: "aa/00", + opts: []kv.CursorOption{ + kv.WithCursorDirection(kv.CursorDescending), + kv.WithCursorHints(kv.WithCursorHintPredicate(func(_, val []byte) bool { return len(val) >= 7 && string(val[:7]) == "val:aa/" - })}, + })), + }, }, exp: []string{"aa/01", "aa/00"}, }, @@ -876,10 +941,13 @@ func KVForwardCursor( return err } - cur, err := b.ForwardCursor([]byte(tt.args.seek), - kv.WithCursorDirection(tt.args.direction), - kv.WithCursorHints(tt.args.hints...)) + cur, err := b.ForwardCursor([]byte(tt.args.seek), tt.args.opts...) if err != nil { + if tt.expErr != nil && errors.Is(err, tt.expErr) { + // successfully returned expected error + return nil + } + t.Errorf("unexpected error: %v", err) return err }