Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kv): add support for prefixed cursor search #16545

Merged
merged 3 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

1. [16523](https://github.com/influxdata/influxdb/pull/16523): Change influx packages to be CRD compliant
1. [16547](https://github.com/influxdata/influxdb/pull/16547): Allow trailing newline in credentials file and CLI integration
1. [16545](https://github.com/influxdata/influxdb/pull/16545): Add support for prefixed cursor search to ForwardCursor types

### UI Improvements

Expand Down
20 changes: 15 additions & 5 deletions bolt/kv.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bolt

import (
"bytes"
"context"
"fmt"
"os"
Expand Down Expand Up @@ -200,13 +201,18 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
var (
cursor = b.bucket.Cursor()
key, value = cursor.Seek(seek)
config = kv.NewCursorConfig(opts...)
)

if config.Prefix != nil && !bytes.HasPrefix(seek, config.Prefix) {
return nil, fmt.Errorf("seek bytes %q not prefixed with %q: %w", string(seek), string(config.Prefix), kv.ErrSeekMissingPrefix)
}

return &Cursor{
cursor: cursor,
key: key,
value: value,
config: kv.NewCursorConfig(opts...),
config: config,
}, nil
}

Expand Down Expand Up @@ -275,7 +281,7 @@ func (c *Cursor) Last() ([]byte, []byte) {

// Next retrieves the next key in the bucket.
func (c *Cursor) Next() (k []byte, v []byte) {
if c.closed {
if c.closed || (c.key != nil && c.missingPrefix(c.key)) {
return nil, nil
}
// get and unset previously seeked values if they exist
Expand All @@ -290,15 +296,15 @@ func (c *Cursor) Next() (k []byte, v []byte) {
}

k, v = next()
if len(k) == 0 && len(v) == 0 {
if (len(k) == 0 && len(v) == 0) || c.missingPrefix(k) {
return nil, nil
}
return k, v
}

// Prev retrieves the previous key in the bucket.
func (c *Cursor) Prev() (k []byte, v []byte) {
if c.closed {
if c.closed || (c.key != nil && c.missingPrefix(c.key)) {
return nil, nil
}
// get and unset previously seeked values if they exist
Expand All @@ -313,12 +319,16 @@ func (c *Cursor) Prev() (k []byte, v []byte) {
}

k, v = prev()
if len(k) == 0 && len(v) == 0 {
if (len(k) == 0 && len(v) == 0) || c.missingPrefix(k) {
return nil, nil
}
return k, v
}

func (c *Cursor) missingPrefix(key []byte) bool {
return c.config.Prefix != nil && !bytes.HasPrefix(key, c.config.Prefix)
}

// Err always returns nil as nothing can go wrong™ during iteration
func (c *Cursor) Err() error {
return nil
Expand Down
28 changes: 20 additions & 8 deletions inmem/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ type pair struct {

// ForwardCursor returns a directional cursor which starts at the provided seeked key
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
config := kv.NewCursorConfig(opts...)
if config.Prefix != nil && !bytes.HasPrefix(seek, config.Prefix) {
return nil, fmt.Errorf("seek bytes %q not prefixed with %q: %w", string(seek), string(config.Prefix), kv.ErrSeekMissingPrefix)
}

var (
pairs = make(chan []pair)
stop = make(chan struct{})
Expand All @@ -262,20 +267,15 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward

var (
batch []pair
config = kv.NewCursorConfig(opts...)
fn = config.Hints.PredicateFn
iterate = func(it btree.ItemIterator) {
b.btree.AscendGreaterOrEqual(&item{key: seek}, it)
}
iterate = b.ascend
)

if config.Direction == kv.CursorDescending {
iterate = func(it btree.ItemIterator) {
b.btree.DescendLessOrEqual(&item{key: seek}, it)
}
iterate = b.descend
}

iterate(func(i btree.Item) bool {
iterate(seek, config, func(i btree.Item) bool {
select {
case <-stop:
// if signalled to stop then exit iteration
Expand All @@ -290,6 +290,10 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
return false
}

if config.Prefix != nil && !bytes.HasPrefix(j.key, config.Prefix) {
return false
}

if fn == nil || fn(j.key, j.value) {
batch = append(batch, pair{Pair: kv.Pair{Key: j.key, Value: j.value}})
}
Expand Down Expand Up @@ -317,6 +321,14 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
return &ForwardCursor{pairs: pairs, stop: stop}, nil
}

func (b *Bucket) ascend(seek []byte, config kv.CursorConfig, it btree.ItemIterator) {
b.btree.AscendGreaterOrEqual(&item{key: seek}, it)
}

func (b *Bucket) descend(seek []byte, config kv.CursorConfig, it btree.ItemIterator) {
b.btree.DescendLessOrEqual(&item{key: seek}, it)
}

// ForwardCursor is a kv.ForwardCursor which iterates over an in-memory btree
type ForwardCursor struct {
pairs <-chan []pair
Expand Down
16 changes: 16 additions & 0 deletions kv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ var (
// ErrTxNotWritable is the error returned when an mutable operation is called during
// a non-writable transaction.
ErrTxNotWritable = errors.New("transaction is not writable")
// ErrSeekMissingPrefix is returned when seek bytes is missing the prefix defined via
// WithCursorPrefix
ErrSeekMissingPrefix = errors.New("seek missing prefix bytes")
)

// IsNotFound returns a boolean indicating whether the error is known to report that a key or was not found.
Expand Down Expand Up @@ -141,6 +144,7 @@ const (
type CursorConfig struct {
Direction CursorDirection
Hints CursorHints
Prefix []byte
}

// NewCursorConfig constructs and configures a CursorConfig used to configure
Expand Down Expand Up @@ -171,3 +175,15 @@ func WithCursorHints(hints ...CursorHint) CursorOption {
}
}
}

// WithCursorPrefix configures the forward cursor to retrieve keys
// with a particular prefix. This implies the cursor will start and end
// at a specific location based on the prefix [prefix, prefix + 1).
//
// The value of the seek bytes must be prefixed with the provided
// prefix, otherwise an error will be returned.
func WithCursorPrefix(prefix []byte) CursorOption {
return func(c *CursorConfig) {
c.Prefix = prefix
}
}
Loading