Skip to content

Commit

Permalink
feat(kv): migration types for managing kv buckets and indexes over time
Browse files Browse the repository at this point in the history
chore(kv): fixup comments in migrator types

fix(kv): initialize migrator bucket on kv service initialize

chore(kv): remove currently unused auth index

chore(kv): remove currently unused urm index
  • Loading branch information
GeorgeMac committed Mar 11, 2020
1 parent e25b8e5 commit f6a1d7d
Show file tree
Hide file tree
Showing 8 changed files with 810 additions and 20 deletions.
5 changes: 5 additions & 0 deletions bolt/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func NewKVStore(log *zap.Logger, path string) *KVStore {
}
}

// AutoMigrate returns true as the bolt KVStore is safe to migrate on initialize.
func (s *KVStore) AutoMigrate() bool {
return true
}

// Open creates boltDB file it doesn't exists and opens it otherwise.
func (s *KVStore) Open(ctx context.Context) error {
span, _ := tracing.StartSpanFromContext(ctx)
Expand Down
5 changes: 5 additions & 0 deletions inmem/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
panic("not implemented")
}

// AutoMigrate returns true as inmem KVStore is safe to migrate on initialize.
func (s *KVStore) AutoMigrate() bool {
return true
}

// Flush removes all data from the buckets. Used for testing.
func (s *KVStore) Flush(ctx context.Context) {
s.mu.Lock()
Expand Down
48 changes: 46 additions & 2 deletions kv/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index {
return index
}

// Initialize creates the bucket if it does not already exist.
func (i *Index) Initialize(ctx context.Context, store Store) error {
func (i *Index) initialize(ctx context.Context, store Store) error {
return store.Update(ctx, func(tx Tx) error {
// create bucket if not exist
_, err := tx.Bucket(i.IndexBucket())
Expand Down Expand Up @@ -180,6 +179,51 @@ func indexKeyParts(indexKey []byte) (fk, pk []byte, err error) {
return
}

// IndexMigration is a migration for adding and removing an index.
// These are constructed via the Index.Migration function.
type IndexMigration struct {
*Index
opts []PopulateOption
}

// Name returns a readable name for the index migration.
func (i *IndexMigration) Name() string {
return fmt.Sprintf("add index %q", string(i.IndexBucket()))
}

// Up initializes the index bucket and populates the index.
func (i *IndexMigration) Up(ctx context.Context, store Store) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("migration (up) %s: %w", i.Name(), err)
}
}()

if err = i.initialize(ctx, store); err != nil {
return err
}

_, err = i.Populate(ctx, store, i.opts...)
return err
}

// Down deletes all entries from the index.
func (i *IndexMigration) Down(ctx context.Context, store Store) error {
if err := i.DeleteAll(ctx, store); err != nil {
return fmt.Errorf("migration (down) %s: %w", i.Name(), err)
}

return nil
}

// Migration creates an IndexMigration for the underlying Index.
func (i *Index) Migration(opts ...PopulateOption) *IndexMigration {
return &IndexMigration{
Index: i,
opts: opts,
}
}

// Insert creates a single index entry for the provided primary key on the foreign key.
func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error {
bkt, err := i.indexBucket(tx)
Expand Down
Loading

0 comments on commit f6a1d7d

Please sign in to comment.