Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kv): support for migrations #17145

Merged
merged 7 commits into from
Mar 18, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
1. [17138](https://github.com/influxdata/influxdb/pull/17138): Extend pkger export all capabilities to support filtering by lable name and resource type
1. [17049](https://github.com/influxdata/influxdb/pull/17049): Added new login and sign-up screen that for cloud users that allows direct login from their region
1. [17170](https://github.com/influxdata/influxdb/pull/17170): Added new cli multiple profiles management tool
1. [17145](https://github.com/influxdata/influxdb/pull/17145): Update kv.Store to define schema changes via new kv.Migrator types

### Bug Fixes

Expand Down
8 changes: 8 additions & 0 deletions bolt/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
// check that *KVStore implement kv.Store interface.
var _ kv.Store = (*KVStore)(nil)

// ensure *KVStore implements kv.AutoMigrationStore.
var _ kv.AutoMigrationStore = (*KVStore)(nil)

// KVStore is a kv.Store backed by boltdb.
type KVStore struct {
path string
Expand All @@ -34,6 +37,11 @@ func NewKVStore(log *zap.Logger, path string) *KVStore {
}
}

// AutoMigrate returns itself as it is safe to automatically apply migrations on initialization.
func (s *KVStore) AutoMigrate() kv.Store {
return s
}

// Open creates boltDB file it doesn't exists and opens it otherwise.
func (s *KVStore) Open(ctx context.Context) error {
span, _ := tracing.StartSpanFromContext(ctx)
Expand Down
8 changes: 8 additions & 0 deletions inmem/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
// ensure *KVStore implement kv.Store interface
var _ kv.Store = (*KVStore)(nil)

// ensure *KVStore implements kv.AutoMigrationStore
var _ kv.AutoMigrationStore = (*KVStore)(nil)

// cursorBatchSize is the size of a batch sent by a forward cursors
// tree iterator
const cursorBatchSize = 1000
Expand Down Expand Up @@ -61,6 +64,11 @@ func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
panic("not implemented")
}

// AutoMigrate returns itlsef as *KVStore is safe to migrate automically on initialize.
func (s *KVStore) AutoMigrate() kv.Store {
return s
}

// Flush removes all data from the buckets. Used for testing.
func (s *KVStore) Flush(ctx context.Context) {
s.mu.Lock()
Expand Down
50 changes: 48 additions & 2 deletions kv/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index {
return index
}

// Initialize creates the bucket if it does not already exist.
func (i *Index) Initialize(ctx context.Context, store Store) error {
func (i *Index) initialize(ctx context.Context, store Store) error {
return store.Update(ctx, func(tx Tx) error {
// create bucket if not exist
_, err := tx.Bucket(i.IndexBucket())
Expand Down Expand Up @@ -180,6 +179,53 @@ func indexKeyParts(indexKey []byte) (fk, pk []byte, err error) {
return
}

// IndexMigration is a migration for adding and removing an index.
// These are constructed via the Index.Migration function.
type IndexMigration struct {
*Index
opts []PopulateOption
}

// Name returns a readable name for the index migration.
func (i *IndexMigration) Name() string {
return fmt.Sprintf("add index %q", string(i.IndexBucket()))
}

// Up initializes the index bucket and populates the index.
func (i *IndexMigration) Up(ctx context.Context, store Store) (err error) {
wrapErr := func(err error) error {
if err == nil {
return nil
}

return fmt.Errorf("migration (up) %s: %w", i.Name(), err)
}

if err = i.initialize(ctx, store); err != nil {
return wrapErr(err)
}

_, err = i.Populate(ctx, store, i.opts...)
return wrapErr(err)
}

// Down deletes all entries from the index.
func (i *IndexMigration) Down(ctx context.Context, store Store) error {
if err := i.DeleteAll(ctx, store); err != nil {
return fmt.Errorf("migration (down) %s: %w", i.Name(), err)
}

return nil
}

// Migration creates an IndexMigration for the underlying Index.
func (i *Index) Migration(opts ...PopulateOption) *IndexMigration {
return &IndexMigration{
Index: i,
opts: opts,
}
}

// Insert creates a single index entry for the provided primary key on the foreign key.
func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error {
bkt, err := i.indexBucket(tx)
Expand Down
Loading