From f4f473e00d94cd315f852e66ed27a86939bdc38c Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Mon, 23 Nov 2020 09:52:45 +1100 Subject: [PATCH 1/4] fix: Ensure Index.Walk fetches matching foreign keys only This commit modifies the behaviour of the indexWalk function to ensure it parses the key parts and matches the foreign key exactly. Closes #20096 --- kv/index.go | 18 +++-- kv/index_migration.go | 4 +- kv/index_test.go | 101 ++++++++++++++++++++++++++++ kv/mock/bucket.go | 135 ++++++++++++++++++++++++++++++++++++++ kv/mock/forward_cursor.go | 76 +++++++++++++++++++++ kv/mock/tx.go | 76 +++++++++++++++++++++ 6 files changed, 402 insertions(+), 8 deletions(-) create mode 100644 kv/mock/bucket.go create mode 100644 kv/mock/forward_cursor.go create mode 100644 kv/mock/tx.go diff --git a/kv/index.go b/kv/index.go index ce2d0d899e2..33e28d35be3 100644 --- a/kv/index.go +++ b/kv/index.go @@ -128,7 +128,9 @@ func (i *Index) sourceBucket(tx Tx) (Bucket, error) { return tx.Bucket(i.SourceBucket()) } -func indexKey(foreignKey, primaryKey []byte) (newKey []byte) { +// IndexKey returns a value suitable for use as the key component +// when storing values in the index. +func IndexKey(foreignKey, primaryKey []byte) (newKey []byte) { newKey = make([]byte, len(primaryKey)+len(foreignKey)+1) copy(newKey, foreignKey) newKey[len(foreignKey)] = '/' @@ -157,7 +159,7 @@ func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error { return err } - return bkt.Put(indexKey(foreignKey, primaryKey), primaryKey) + return bkt.Put(IndexKey(foreignKey, primaryKey), primaryKey) } // Delete removes the foreignKey and primaryKey mapping from the underlying index. @@ -167,7 +169,7 @@ func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error { return err } - return bkt.Delete(indexKey(foreignKey, primaryKey)) + return bkt.Delete(IndexKey(foreignKey, primaryKey)) } // Walk walks the source bucket using keys found in the index using the provided foreign key @@ -195,16 +197,20 @@ func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn Visi return err } - return indexWalk(ctx, cursor, sourceBucket, visitFn) + return indexWalk(ctx, foreignKey, cursor, sourceBucket, visitFn) } // indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their // associated primaryKey's value in the provided source bucket. // When an item is located in the source, the provided visit function is called with primary key and associated value. -func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) { +func indexWalk(ctx context.Context, foreignKey []byte, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) { var keys [][]byte for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() { - keys = append(keys, pk) + if fk, _, err := indexKeyParts(ik); err != nil { + return err + } else if string(fk) == string(foreignKey) { + keys = append(keys, pk) + } } if err := indexCursor.Err(); err != nil { diff --git a/kv/index_migration.go b/kv/index_migration.go index b6ea2dfdd65..7b33f336712 100644 --- a/kv/index_migration.go +++ b/kv/index_migration.go @@ -128,7 +128,7 @@ func (i *IndexMigration) Populate(ctx context.Context, store Store) (n int, err for fk, fkm := range diff.MissingFromIndex { for pk := range fkm { - batch = append(batch, [2][]byte{indexKey([]byte(fk), []byte(pk)), []byte(pk)}) + batch = append(batch, [2][]byte{IndexKey([]byte(fk), []byte(pk)), []byte(pk)}) if len(batch) >= i.operationBatchSize { if err := flush(batch); err != nil { @@ -183,7 +183,7 @@ func (i *IndexMigration) remove(ctx context.Context, store Store, mappings map[s for fk, fkm := range mappings { for pk := range fkm { - batch = append(batch, indexKey([]byte(fk), []byte(pk))) + batch = append(batch, IndexKey([]byte(fk), []byte(pk))) if len(batch) >= i.operationBatchSize { if err := flush(batch); err != nil { diff --git a/kv/index_test.go b/kv/index_test.go index 5b4726eaf25..090280935a6 100644 --- a/kv/index_test.go +++ b/kv/index_test.go @@ -7,9 +7,14 @@ import ( "os" "testing" + "github.com/golang/mock/gomock" "github.com/influxdata/influxdb/v2/bolt" "github.com/influxdata/influxdb/v2/inmem" + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/kv/mock" influxdbtesting "github.com/influxdata/influxdb/v2/testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -33,6 +38,102 @@ func Test_Bolt_Index(t *testing.T) { influxdbtesting.TestIndex(t, s) } +func TestIndex_Walk(t *testing.T) { + t.Run("only selects exact keys", func(t *testing.T) { + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + type keyValue struct{ key, val []byte } + makeIndexKV := func(fk, pk string) keyValue { + return keyValue{ + key: kv.IndexKey([]byte(fk), []byte(pk)), + val: []byte(pk), + } + } + + makeKV := func(key, val string) keyValue { + return keyValue{[]byte(key), []byte(val)} + } + + var ( + sourceBucket = []byte("source") + indexBucket = []byte("index") + foreignKey = []byte("jenkins") + idxkeyvals = []keyValue{ + makeIndexKV("jenkins-aws", "pk1"), + makeIndexKV("jenkins-aws", "pk2"), + makeIndexKV("jenkins-aws", "pk3"), + makeIndexKV("jenkins", "pk4"), + makeIndexKV("jenkins", "pk5"), + } + srckeyvals = []struct{ key, val []byte }{ + makeKV("pk4", "val4"), + makeKV("pk5", "val5"), + } + ) + + mapping := kv.NewIndexMapping(sourceBucket, indexBucket, func(data []byte) ([]byte, error) { + return nil, nil + }) + + tx := mock.NewMockTx(ctrl) + + src := mock.NewMockBucket(ctrl) + src.EXPECT(). + GetBatch(srckeyvals[0].key, srckeyvals[1].key). + Return([][]byte{srckeyvals[0].val, srckeyvals[1].val}, nil) + + tx.EXPECT(). + Bucket(sourceBucket). + Return(src, nil) + + idx := mock.NewMockBucket(ctrl) + tx.EXPECT(). + Bucket(indexBucket). + Return(idx, nil) + + cur := mock.NewMockForwardCursor(ctrl) + + i := 0 + cur.EXPECT(). + Next(). + DoAndReturn(func() ([]byte, []byte) { + var k, v []byte + if i < len(idxkeyvals) { + elem := idxkeyvals[i] + i++ + k, v = elem.key, elem.val + } + + return k, v + }). + Times(len(idxkeyvals) + 1) + cur.EXPECT(). + Err(). + Return(nil) + cur.EXPECT(). + Close(). + Return(nil) + idx.EXPECT(). + ForwardCursor(foreignKey, gomock.Any()). + Return(cur, nil) + + ctx := context.Background() + index := kv.NewIndex(mapping, kv.WithIndexReadPathEnabled) + + j := 0 + err := index.Walk(ctx, tx, foreignKey, func(k, v []byte) (bool, error) { + require.Less(t, j, len(srckeyvals)) + assert.Equal(t, srckeyvals[j].key, k) + assert.Equal(t, srckeyvals[j].val, v) + j++ + return true, nil + }) + + assert.NoError(t, err) + }) +} + func Benchmark_Inmem_Index_Walk(b *testing.B) { influxdbtesting.BenchmarkIndexWalk(b, inmem.NewKVStore(), 1000, 200) } diff --git a/kv/mock/bucket.go b/kv/mock/bucket.go new file mode 100644 index 00000000000..7a9fd8b8262 --- /dev/null +++ b/kv/mock/bucket.go @@ -0,0 +1,135 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/influxdata/influxdb/v2/kv (interfaces: Bucket) + +// Package mock is a generated GoMock package. +package mock + +import ( + gomock "github.com/golang/mock/gomock" + kv "github.com/influxdata/influxdb/v2/kv" + reflect "reflect" +) + +// MockBucket is a mock of Bucket interface +type MockBucket struct { + ctrl *gomock.Controller + recorder *MockBucketMockRecorder +} + +// MockBucketMockRecorder is the mock recorder for MockBucket +type MockBucketMockRecorder struct { + mock *MockBucket +} + +// NewMockBucket creates a new mock instance +func NewMockBucket(ctrl *gomock.Controller) *MockBucket { + mock := &MockBucket{ctrl: ctrl} + mock.recorder = &MockBucketMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockBucket) EXPECT() *MockBucketMockRecorder { + return m.recorder +} + +// Cursor mocks base method +func (m *MockBucket) Cursor(arg0 ...kv.CursorHint) (kv.Cursor, error) { + m.ctrl.T.Helper() + varargs := []interface{}{} + for _, a := range arg0 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Cursor", varargs...) + ret0, _ := ret[0].(kv.Cursor) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Cursor indicates an expected call of Cursor +func (mr *MockBucketMockRecorder) Cursor(arg0 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cursor", reflect.TypeOf((*MockBucket)(nil).Cursor), arg0...) +} + +// Delete mocks base method +func (m *MockBucket) Delete(arg0 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete +func (mr *MockBucketMockRecorder) Delete(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockBucket)(nil).Delete), arg0) +} + +// ForwardCursor mocks base method +func (m *MockBucket) ForwardCursor(arg0 []byte, arg1 ...kv.CursorOption) (kv.ForwardCursor, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ForwardCursor", varargs...) + ret0, _ := ret[0].(kv.ForwardCursor) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ForwardCursor indicates an expected call of ForwardCursor +func (mr *MockBucketMockRecorder) ForwardCursor(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForwardCursor", reflect.TypeOf((*MockBucket)(nil).ForwardCursor), varargs...) +} + +// Get mocks base method +func (m *MockBucket) Get(arg0 []byte) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", arg0) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get +func (mr *MockBucketMockRecorder) Get(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockBucket)(nil).Get), arg0) +} + +// GetBatch mocks base method +func (m *MockBucket) GetBatch(arg0 ...[]byte) ([][]byte, error) { + m.ctrl.T.Helper() + varargs := []interface{}{} + for _, a := range arg0 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetBatch", varargs...) + ret0, _ := ret[0].([][]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetBatch indicates an expected call of GetBatch +func (mr *MockBucketMockRecorder) GetBatch(arg0 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBatch", reflect.TypeOf((*MockBucket)(nil).GetBatch), arg0...) +} + +// Put mocks base method +func (m *MockBucket) Put(arg0, arg1 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Put", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Put indicates an expected call of Put +func (mr *MockBucketMockRecorder) Put(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockBucket)(nil).Put), arg0, arg1) +} diff --git a/kv/mock/forward_cursor.go b/kv/mock/forward_cursor.go new file mode 100644 index 00000000000..cc066398065 --- /dev/null +++ b/kv/mock/forward_cursor.go @@ -0,0 +1,76 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/influxdata/influxdb/v2/kv (interfaces: ForwardCursor) + +// Package mock is a generated GoMock package. +package mock + +import ( + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockForwardCursor is a mock of ForwardCursor interface +type MockForwardCursor struct { + ctrl *gomock.Controller + recorder *MockForwardCursorMockRecorder +} + +// MockForwardCursorMockRecorder is the mock recorder for MockForwardCursor +type MockForwardCursorMockRecorder struct { + mock *MockForwardCursor +} + +// NewMockForwardCursor creates a new mock instance +func NewMockForwardCursor(ctrl *gomock.Controller) *MockForwardCursor { + mock := &MockForwardCursor{ctrl: ctrl} + mock.recorder = &MockForwardCursorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockForwardCursor) EXPECT() *MockForwardCursorMockRecorder { + return m.recorder +} + +// Close mocks base method +func (m *MockForwardCursor) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockForwardCursorMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockForwardCursor)(nil).Close)) +} + +// Err mocks base method +func (m *MockForwardCursor) Err() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Err") + ret0, _ := ret[0].(error) + return ret0 +} + +// Err indicates an expected call of Err +func (mr *MockForwardCursorMockRecorder) Err() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockForwardCursor)(nil).Err)) +} + +// Next mocks base method +func (m *MockForwardCursor) Next() ([]byte, []byte) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next") + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].([]byte) + return ret0, ret1 +} + +// Next indicates an expected call of Next +func (mr *MockForwardCursorMockRecorder) Next() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockForwardCursor)(nil).Next)) +} diff --git a/kv/mock/tx.go b/kv/mock/tx.go new file mode 100644 index 00000000000..3a326fa12d6 --- /dev/null +++ b/kv/mock/tx.go @@ -0,0 +1,76 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/influxdata/influxdb/v2/kv (interfaces: Tx) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + gomock "github.com/golang/mock/gomock" + kv "github.com/influxdata/influxdb/v2/kv" + reflect "reflect" +) + +// MockTx is a mock of Tx interface +type MockTx struct { + ctrl *gomock.Controller + recorder *MockTxMockRecorder +} + +// MockTxMockRecorder is the mock recorder for MockTx +type MockTxMockRecorder struct { + mock *MockTx +} + +// NewMockTx creates a new mock instance +func NewMockTx(ctrl *gomock.Controller) *MockTx { + mock := &MockTx{ctrl: ctrl} + mock.recorder = &MockTxMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockTx) EXPECT() *MockTxMockRecorder { + return m.recorder +} + +// Bucket mocks base method +func (m *MockTx) Bucket(arg0 []byte) (kv.Bucket, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Bucket", arg0) + ret0, _ := ret[0].(kv.Bucket) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Bucket indicates an expected call of Bucket +func (mr *MockTxMockRecorder) Bucket(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bucket", reflect.TypeOf((*MockTx)(nil).Bucket), arg0) +} + +// Context mocks base method +func (m *MockTx) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context +func (mr *MockTxMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockTx)(nil).Context)) +} + +// WithContext mocks base method +func (m *MockTx) WithContext(arg0 context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "WithContext", arg0) +} + +// WithContext indicates an expected call of WithContext +func (mr *MockTxMockRecorder) WithContext(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WithContext", reflect.TypeOf((*MockTx)(nil).WithContext), arg0) +} From 8ab63b44982504a61653b2812b65d39ff52f3093 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Mon, 23 Nov 2020 09:56:34 +1100 Subject: [PATCH 2/4] fix: Add a ByOrgID index to DBRP This commit adds a new index and migration to the DBRP service for retrieving all database and retention policy mappings for a single organization. This change was required to resolve an invalid assumption of the DBRP service, which relied on a prefix match of the byOrgAndDatabase kv.Index when performing search operations by organization ID only. Closes #20096 --- dbrp/index.go | 20 ++++++++ dbrp/service.go | 54 ++++++++++++++++------ kv/migration/all/0012_dbrp_by_org_index.go | 8 ++++ kv/migration/all/all.go | 2 + testing/dbrp_mapping_v2.go | 8 ++-- 5 files changed, 74 insertions(+), 18 deletions(-) create mode 100644 dbrp/index.go create mode 100644 kv/migration/all/0012_dbrp_by_org_index.go diff --git a/dbrp/index.go b/dbrp/index.go new file mode 100644 index 00000000000..8d688fb54b0 --- /dev/null +++ b/dbrp/index.go @@ -0,0 +1,20 @@ +package dbrp + +import ( + "encoding/json" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kv" +) + +var ( + ByOrgIDIndexMapping = kv.NewIndexMapping(bucket, byOrgIDIndexBucket, func(v []byte) ([]byte, error) { + var dbrp influxdb.DBRPMappingV2 + if err := json.Unmarshal(v, &dbrp); err != nil { + return nil, err + } + + id, _ := dbrp.OrganizationID.Encode() + return id, nil + }) +) diff --git a/dbrp/service.go b/dbrp/service.go index 86ecf5c3d3e..2f0983d11aa 100644 --- a/dbrp/service.go +++ b/dbrp/service.go @@ -35,9 +35,10 @@ import ( ) var ( - bucket = []byte("dbrpv1") - indexBucket = []byte("dbrpbyorganddbindexv1") - defaultBucket = []byte("dbrpdefaultv1") + bucket = []byte("dbrpv1") + indexBucket = []byte("dbrpbyorganddbindexv1") + byOrgIDIndexBucket = []byte("dbrpbyorgv1") + defaultBucket = []byte("dbrpdefaultv1") ) var _ influxdb.DBRPMappingServiceV2 = (*AuthorizedService)(nil) @@ -48,6 +49,7 @@ type Service struct { bucketSvc influxdb.BucketService byOrgAndDatabase *kv.Index + byOrg *kv.Index } func indexForeignKey(dbrp influxdb.DBRPMappingV2) []byte { @@ -74,6 +76,7 @@ func NewService(ctx context.Context, bucketSvc influxdb.BucketService, st kv.Sto } return indexForeignKey(dbrp), nil }), kv.WithIndexReadPathEnabled), + byOrg: kv.NewIndex(ByOrgIDIndexMapping, kv.WithIndexReadPathEnabled), } } @@ -277,17 +280,17 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte err := s.store.View(ctx, func(tx kv.Tx) error { // Optimized path, use index. if orgID := filter.OrgID; orgID != nil { - // The index performs a prefix search. - // The foreign key is `orgID + db`. - // If you want to look by orgID only, just pass orgID as prefix. - db := "" - if filter.Database != nil { + var ( + db = "" + compKey []byte + index *kv.Index + ) + if filter.Database != nil && len(*filter.Database) > 0 { db = *filter.Database - } - compKey := composeForeignKey(*orgID, db) - if len(db) > 0 { - // Even more optimized, looking for the default given an orgID and database. - // No walking index needed. + compKey = composeForeignKey(*orgID, db) + index = s.byOrgAndDatabase + + // Filtering by Org, Database and Default == true if def := filter.Default; def != nil && *def { defID, err := s.getDefault(tx, compKey) if kv.IsNotFound(err) { @@ -307,8 +310,12 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte _, err = add(tx)(defID, v) return err } + } else { + compKey, _ = orgID.Encode() + index = s.byOrg } - return s.byOrgAndDatabase.Walk(ctx, tx, compKey, add(tx)) + + return index.Walk(ctx, tx, compKey, add(tx)) } bucket, err := tx.Bucket(bucket) if err != nil { @@ -359,15 +366,25 @@ func (s *Service) Create(ctx context.Context, dbrp *influxdb.DBRPMappingV2) erro return ErrInvalidDBRPID } + // OrganizationID has been validated by Validate + orgID, _ := dbrp.OrganizationID.Encode() + return s.store.Update(ctx, func(tx kv.Tx) error { bucket, err := tx.Bucket(bucket) if err != nil { return ErrInternalService(err) } + + // populate indices compKey := indexForeignKey(*dbrp) if err := s.byOrgAndDatabase.Insert(tx, compKey, encodedID); err != nil { return err } + + if err := s.byOrg.Insert(tx, orgID, encodedID); err != nil { + return err + } + defSet, err := s.isDefaultSet(tx, compKey) if err != nil { return err @@ -463,6 +480,12 @@ func (s *Service) Delete(ctx context.Context, orgID, id influxdb.ID) error { if err != nil { return ErrInternalService(err) } + + encodedOrgID, err := id.Encode() + if err != nil { + return ErrInternalService(err) + } + return s.store.Update(ctx, func(tx kv.Tx) error { bucket, err := tx.Bucket(bucket) if err != nil { @@ -475,6 +498,9 @@ func (s *Service) Delete(ctx context.Context, orgID, id influxdb.ID) error { if err := s.byOrgAndDatabase.Delete(tx, compKey, encodedID); err != nil { return ErrInternalService(err) } + if err := s.byOrg.Delete(tx, encodedOrgID, encodedID); err != nil { + return ErrInternalService(err) + } // If this was the default, we need to set a new default. var derr error if dbrp.Default { diff --git a/kv/migration/all/0012_dbrp_by_org_index.go b/kv/migration/all/0012_dbrp_by_org_index.go new file mode 100644 index 00000000000..421c3bfceb9 --- /dev/null +++ b/kv/migration/all/0012_dbrp_by_org_index.go @@ -0,0 +1,8 @@ +package all + +import ( + "github.com/influxdata/influxdb/v2/dbrp" + "github.com/influxdata/influxdb/v2/kv" +) + +var Migration0012_DBRPByOrgIndex = kv.NewIndexMigration(dbrp.ByOrgIDIndexMapping, kv.WithIndexMigrationCleanup) diff --git a/kv/migration/all/all.go b/kv/migration/all/all.go index 706856bdf45..e51baf22cbd 100644 --- a/kv/migration/all/all.go +++ b/kv/migration/all/all.go @@ -29,5 +29,7 @@ var Migrations = [...]migration.Spec{ Migration0010_AddIndexTelegrafByOrg, // populate dashboards owner id Migration0011_PopulateDashboardsOwnerId, + // Populate the DBRP service ByOrg index + Migration0012_DBRPByOrgIndex, // {{ do_not_edit . }} } diff --git a/testing/dbrp_mapping_v2.go b/testing/dbrp_mapping_v2.go index ff4f3311bc6..dd0c2b2be82 100644 --- a/testing/dbrp_mapping_v2.go +++ b/testing/dbrp_mapping_v2.go @@ -735,7 +735,7 @@ func FindManyDBRPMappingsV2( fields: DBRPMappingFieldsV2{ DBRPMappingsV2: []*influxdb.DBRPMappingV2{ { - ID: 100, + ID: MustIDBase16("0000000000000100"), Database: "database", RetentionPolicy: "retention_policyA", Default: false, @@ -743,7 +743,7 @@ func FindManyDBRPMappingsV2( BucketID: MustIDBase16(dbrpBucketAID), }, { - ID: 200, + ID: MustIDBase16("0000000000000200"), Database: "database", RetentionPolicy: "retention_policyB", Default: true, @@ -751,7 +751,7 @@ func FindManyDBRPMappingsV2( BucketID: MustIDBase16(dbrpBucketBID), }, { - ID: 300, + ID: MustIDBase16("0000000000000300"), Database: "database", RetentionPolicy: "retention_policyB", Default: true, @@ -770,7 +770,7 @@ func FindManyDBRPMappingsV2( wants: wants{ dbrpMappings: []*influxdb.DBRPMappingV2{ { - ID: 200, + ID: MustIDBase16("0000000000000200"), Database: "database", RetentionPolicy: "retention_policyB", Default: true, From 30599fec61d86bb8423340a98c7f097ba31fc9cf Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Mon, 23 Nov 2020 10:30:58 +1100 Subject: [PATCH 3/4] chore: Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf94b402436..96b453b412b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ want to use the default. 1. [20053](https://github.com/influxdata/influxdb/pull/20053): Upgrade Flux to v0.95.0. 1. [20058](https://github.com/influxdata/influxdb/pull/20058): UI: Upgrade flux-lsp-browser to v0.5.23. 1. [20067](https://github.com/influxdata/influxdb/pull/20067): Add DBRP cli commands as `influxd v1 dbrp`. +1. [20097](https://github.com/influxdata/influxdb/pull/20097): Ensure Index.Walk fetches matching foreign keys only. ### Bug Fixes From 1921d3e55446144898099637bff2c8e007a73a6e Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Tue, 24 Nov 2020 09:58:01 +1100 Subject: [PATCH 4/4] =?UTF-8?q?fix:=20PR=20Feedback=20=E2=80=93=20ensure?= =?UTF-8?q?=20keys=20cannot=20contain=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kv/index.go | 36 ++++++++++++++++++++++++++------ kv/index_migration.go | 13 ++++++++++-- kv/index_test.go | 48 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 88 insertions(+), 9 deletions(-) diff --git a/kv/index.go b/kv/index.go index 33e28d35be3..3470cf7286d 100644 --- a/kv/index.go +++ b/kv/index.go @@ -128,9 +128,23 @@ func (i *Index) sourceBucket(tx Tx) (Bucket, error) { return tx.Bucket(i.SourceBucket()) } +var ( + // ErrKeyInvalidCharacters is returned when a foreignKey or primaryKey contains + // + ErrKeyInvalidCharacters = errors.New("key: contains invalid characters") +) + // IndexKey returns a value suitable for use as the key component -// when storing values in the index. -func IndexKey(foreignKey, primaryKey []byte) (newKey []byte) { +// when storing values in the index. IndexKey returns an +// ErrKeyInvalidCharacters error if either the foreignKey or primaryKey contains a /. +func IndexKey(foreignKey, primaryKey []byte) (newKey []byte, err error) { + if bytes.IndexByte(foreignKey, '/') != -1 { + return nil, ErrKeyInvalidCharacters + } + if bytes.IndexByte(primaryKey, '/') != -1 { + return nil, ErrKeyInvalidCharacters + } + newKey = make([]byte, len(primaryKey)+len(foreignKey)+1) copy(newKey, foreignKey) newKey[len(foreignKey)] = '/' @@ -159,7 +173,12 @@ func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error { return err } - return bkt.Put(IndexKey(foreignKey, primaryKey), primaryKey) + key, err := IndexKey(foreignKey, primaryKey) + if err != nil { + return err + } + + return bkt.Put(key, primaryKey) } // Delete removes the foreignKey and primaryKey mapping from the underlying index. @@ -169,7 +188,12 @@ func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error { return err } - return bkt.Delete(IndexKey(foreignKey, primaryKey)) + key, err := IndexKey(foreignKey, primaryKey) + if err != nil { + return err + } + + return bkt.Delete(key) } // Walk walks the source bucket using keys found in the index using the provided foreign key @@ -197,13 +221,13 @@ func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn Visi return err } - return indexWalk(ctx, foreignKey, cursor, sourceBucket, visitFn) + return indexWalk(foreignKey, cursor, sourceBucket, visitFn) } // indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their // associated primaryKey's value in the provided source bucket. // When an item is located in the source, the provided visit function is called with primary key and associated value. -func indexWalk(ctx context.Context, foreignKey []byte, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) { +func indexWalk(foreignKey []byte, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) { var keys [][]byte for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() { if fk, _, err := indexKeyParts(ik); err != nil { diff --git a/kv/index_migration.go b/kv/index_migration.go index 7b33f336712..363c8981aa5 100644 --- a/kv/index_migration.go +++ b/kv/index_migration.go @@ -128,7 +128,11 @@ func (i *IndexMigration) Populate(ctx context.Context, store Store) (n int, err for fk, fkm := range diff.MissingFromIndex { for pk := range fkm { - batch = append(batch, [2][]byte{IndexKey([]byte(fk), []byte(pk)), []byte(pk)}) + key, err := IndexKey([]byte(fk), []byte(pk)) + if err != nil { + return n, err + } + batch = append(batch, [2][]byte{key, []byte(pk)}) if len(batch) >= i.operationBatchSize { if err := flush(batch); err != nil { @@ -183,7 +187,12 @@ func (i *IndexMigration) remove(ctx context.Context, store Store, mappings map[s for fk, fkm := range mappings { for pk := range fkm { - batch = append(batch, IndexKey([]byte(fk), []byte(pk))) + key, err := IndexKey([]byte(fk), []byte(pk)) + if err != nil { + return err + } + + batch = append(batch, key) if len(batch) >= i.operationBatchSize { if err := flush(batch); err != nil { diff --git a/kv/index_test.go b/kv/index_test.go index 090280935a6..017bf0fe1d9 100644 --- a/kv/index_test.go +++ b/kv/index_test.go @@ -38,6 +38,48 @@ func Test_Bolt_Index(t *testing.T) { influxdbtesting.TestIndex(t, s) } +func TestIndexKey(t *testing.T) { + tests := []struct { + name string + fk string + pk string + expKey string + expErr error + }{ + { + name: "returns key", + fk: "fk_part", + pk: "pk_part", + expKey: "fk_part/pk_part", + }, + { + name: "returns error for invalid foreign key", + fk: "fk/part", + pk: "pk_part", + expErr: kv.ErrKeyInvalidCharacters, + }, + { + name: "returns error for invalid primary key", + fk: "fk_part", + pk: "pk/part", + expErr: kv.ErrKeyInvalidCharacters, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + gotKey, gotErr := kv.IndexKey([]byte(test.fk), []byte(test.pk)) + if test.expErr != nil { + require.Error(t, gotErr) + assert.EqualError(t, test.expErr, gotErr.Error()) + assert.Nil(t, gotKey) + } else { + assert.NoError(t, gotErr) + assert.Equal(t, test.expKey, string(gotKey)) + } + }) + } +} + func TestIndex_Walk(t *testing.T) { t.Run("only selects exact keys", func(t *testing.T) { ctrl := gomock.NewController(t) @@ -45,8 +87,12 @@ func TestIndex_Walk(t *testing.T) { type keyValue struct{ key, val []byte } makeIndexKV := func(fk, pk string) keyValue { + key, err := kv.IndexKey([]byte(fk), []byte(pk)) + if err != nil { + panic(err) + } return keyValue{ - key: kv.IndexKey([]byte(fk), []byte(pk)), + key: key, val: []byte(pk), } }