Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure Index.Walk fetches matching foreign keys only #20097

Merged
merged 4 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ want to use the default.
1. [20053](https://github.com/influxdata/influxdb/pull/20053): Upgrade Flux to v0.95.0.
1. [20058](https://github.com/influxdata/influxdb/pull/20058): UI: Upgrade flux-lsp-browser to v0.5.23.
1. [20067](https://github.com/influxdata/influxdb/pull/20067): Add DBRP cli commands as `influxd v1 dbrp`.
1. [20097](https://github.com/influxdata/influxdb/pull/20097): Ensure Index.Walk fetches matching foreign keys only.

### Bug Fixes

Expand Down
20 changes: 20 additions & 0 deletions dbrp/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package dbrp

import (
"encoding/json"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
)

var (
ByOrgIDIndexMapping = kv.NewIndexMapping(bucket, byOrgIDIndexBucket, func(v []byte) ([]byte, error) {
var dbrp influxdb.DBRPMappingV2
if err := json.Unmarshal(v, &dbrp); err != nil {
return nil, err
}

id, _ := dbrp.OrganizationID.Encode()
return id, nil
})
)
54 changes: 40 additions & 14 deletions dbrp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ import (
)

var (
bucket = []byte("dbrpv1")
indexBucket = []byte("dbrpbyorganddbindexv1")
defaultBucket = []byte("dbrpdefaultv1")
bucket = []byte("dbrpv1")
indexBucket = []byte("dbrpbyorganddbindexv1")
byOrgIDIndexBucket = []byte("dbrpbyorgv1")
defaultBucket = []byte("dbrpdefaultv1")
)

var _ influxdb.DBRPMappingServiceV2 = (*AuthorizedService)(nil)
Expand All @@ -48,6 +49,7 @@ type Service struct {

bucketSvc influxdb.BucketService
byOrgAndDatabase *kv.Index
byOrg *kv.Index
}

func indexForeignKey(dbrp influxdb.DBRPMappingV2) []byte {
Expand All @@ -74,6 +76,7 @@ func NewService(ctx context.Context, bucketSvc influxdb.BucketService, st kv.Sto
}
return indexForeignKey(dbrp), nil
}), kv.WithIndexReadPathEnabled),
byOrg: kv.NewIndex(ByOrgIDIndexMapping, kv.WithIndexReadPathEnabled),
}
}

Expand Down Expand Up @@ -277,17 +280,17 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
err := s.store.View(ctx, func(tx kv.Tx) error {
// Optimized path, use index.
if orgID := filter.OrgID; orgID != nil {
// The index performs a prefix search.
// The foreign key is `orgID + db`.
// If you want to look by orgID only, just pass orgID as prefix.
db := ""
if filter.Database != nil {
var (
db = ""
compKey []byte
index *kv.Index
)
if filter.Database != nil && len(*filter.Database) > 0 {
db = *filter.Database
}
compKey := composeForeignKey(*orgID, db)
if len(db) > 0 {
// Even more optimized, looking for the default given an orgID and database.
// No walking index needed.
compKey = composeForeignKey(*orgID, db)
index = s.byOrgAndDatabase

// Filtering by Org, Database and Default == true
if def := filter.Default; def != nil && *def {
defID, err := s.getDefault(tx, compKey)
if kv.IsNotFound(err) {
Expand All @@ -307,8 +310,12 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
_, err = add(tx)(defID, v)
return err
}
} else {
compKey, _ = orgID.Encode()
index = s.byOrg
}
return s.byOrgAndDatabase.Walk(ctx, tx, compKey, add(tx))

return index.Walk(ctx, tx, compKey, add(tx))
}
bucket, err := tx.Bucket(bucket)
if err != nil {
Expand Down Expand Up @@ -359,15 +366,25 @@ func (s *Service) Create(ctx context.Context, dbrp *influxdb.DBRPMappingV2) erro
return ErrInvalidDBRPID
}

// OrganizationID has been validated by Validate
orgID, _ := dbrp.OrganizationID.Encode()

return s.store.Update(ctx, func(tx kv.Tx) error {
bucket, err := tx.Bucket(bucket)
if err != nil {
return ErrInternalService(err)
}

// populate indices
compKey := indexForeignKey(*dbrp)
if err := s.byOrgAndDatabase.Insert(tx, compKey, encodedID); err != nil {
return err
}

if err := s.byOrg.Insert(tx, orgID, encodedID); err != nil {
return err
}

defSet, err := s.isDefaultSet(tx, compKey)
if err != nil {
return err
Expand Down Expand Up @@ -463,6 +480,12 @@ func (s *Service) Delete(ctx context.Context, orgID, id influxdb.ID) error {
if err != nil {
return ErrInternalService(err)
}

encodedOrgID, err := id.Encode()
if err != nil {
return ErrInternalService(err)
}

return s.store.Update(ctx, func(tx kv.Tx) error {
bucket, err := tx.Bucket(bucket)
if err != nil {
Expand All @@ -475,6 +498,9 @@ func (s *Service) Delete(ctx context.Context, orgID, id influxdb.ID) error {
if err := s.byOrgAndDatabase.Delete(tx, compKey, encodedID); err != nil {
return ErrInternalService(err)
}
if err := s.byOrg.Delete(tx, encodedOrgID, encodedID); err != nil {
return ErrInternalService(err)
}
// If this was the default, we need to set a new default.
var derr error
if dbrp.Default {
Expand Down
42 changes: 36 additions & 6 deletions kv/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,23 @@ func (i *Index) sourceBucket(tx Tx) (Bucket, error) {
return tx.Bucket(i.SourceBucket())
}

func indexKey(foreignKey, primaryKey []byte) (newKey []byte) {
var (
// ErrKeyInvalidCharacters is returned when a foreignKey or primaryKey contains
//
ErrKeyInvalidCharacters = errors.New("key: contains invalid characters")
)

// IndexKey returns a value suitable for use as the key component
// when storing values in the index. IndexKey returns an
// ErrKeyInvalidCharacters error if either the foreignKey or primaryKey contains a /.
func IndexKey(foreignKey, primaryKey []byte) (newKey []byte, err error) {
if bytes.IndexByte(foreignKey, '/') != -1 {
return nil, ErrKeyInvalidCharacters
}
if bytes.IndexByte(primaryKey, '/') != -1 {
return nil, ErrKeyInvalidCharacters
}

newKey = make([]byte, len(primaryKey)+len(foreignKey)+1)
copy(newKey, foreignKey)
newKey[len(foreignKey)] = '/'
Expand Down Expand Up @@ -157,7 +173,12 @@ func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error {
return err
}

return bkt.Put(indexKey(foreignKey, primaryKey), primaryKey)
key, err := IndexKey(foreignKey, primaryKey)
if err != nil {
return err
}

return bkt.Put(key, primaryKey)
}

// Delete removes the foreignKey and primaryKey mapping from the underlying index.
Expand All @@ -167,7 +188,12 @@ func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error {
return err
}

return bkt.Delete(indexKey(foreignKey, primaryKey))
key, err := IndexKey(foreignKey, primaryKey)
if err != nil {
return err
}

return bkt.Delete(key)
}

// Walk walks the source bucket using keys found in the index using the provided foreign key
Expand Down Expand Up @@ -195,16 +221,20 @@ func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn Visi
return err
}

return indexWalk(ctx, cursor, sourceBucket, visitFn)
return indexWalk(foreignKey, cursor, sourceBucket, visitFn)
}

// indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their
// associated primaryKey's value in the provided source bucket.
// When an item is located in the source, the provided visit function is called with primary key and associated value.
func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) {
func indexWalk(foreignKey []byte, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) {
var keys [][]byte
for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() {
keys = append(keys, pk)
if fk, _, err := indexKeyParts(ik); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙌 awesome

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually @stuartcarnie you know what I've just thought of something better.
If we curse foreignKey + "/" then we push down the actual prefix to the db.
This does make me think the foreign key needs to be escaped for "/" as well though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unclear what I should change here – should we add some code to validate that foreign keys do not have / for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spoke with @GeorgeMac and I am going to reject foreign keys with a /. Clients of the Index will be expected to partition their foreign keys with characters other than a /

return err
} else if string(fk) == string(foreignKey) {
keys = append(keys, pk)
}
}

if err := indexCursor.Err(); err != nil {
Expand Down
13 changes: 11 additions & 2 deletions kv/index_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ func (i *IndexMigration) Populate(ctx context.Context, store Store) (n int, err

for fk, fkm := range diff.MissingFromIndex {
for pk := range fkm {
batch = append(batch, [2][]byte{indexKey([]byte(fk), []byte(pk)), []byte(pk)})
key, err := IndexKey([]byte(fk), []byte(pk))
if err != nil {
return n, err
}
batch = append(batch, [2][]byte{key, []byte(pk)})

if len(batch) >= i.operationBatchSize {
if err := flush(batch); err != nil {
Expand Down Expand Up @@ -183,7 +187,12 @@ func (i *IndexMigration) remove(ctx context.Context, store Store, mappings map[s

for fk, fkm := range mappings {
for pk := range fkm {
batch = append(batch, indexKey([]byte(fk), []byte(pk)))
key, err := IndexKey([]byte(fk), []byte(pk))
if err != nil {
return err
}

batch = append(batch, key)

if len(batch) >= i.operationBatchSize {
if err := flush(batch); err != nil {
Expand Down
Loading