Skip to content

Commit

Permalink
WIP - tired, reset this in morning
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Jun 15, 2023
1 parent c74bbef commit fee3881
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 33 deletions.
2 changes: 2 additions & 0 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ type Store interface {

SetMigration(context.Context, LensConfig) error

LensRegistry() LensRegistry

// GetCollectionByName attempts to retrieve a collection matching the given name.
//
// If no matching collection is found an error will be returned.
Expand Down
8 changes: 8 additions & 0 deletions client/lens.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ package client

import (
"github.com/lens-vm/lens/host-go/config/model"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/immutable/enumerable"
)

type LensConfig struct {
SourceSchema string
DestinationSchema string
model.Lens
}

type LensRegistry interface {
RegisterLens(datastore.Txn, LensConfig) error
MigrateUp(enumerable.Enumerable[string], string) (enumerable.Enumerable[string], error)
Config() []LensConfig
}
13 changes: 13 additions & 0 deletions core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta, id string) er
return c.deleteWithPrefix(ctx, c.key.WithValueFlag().WithFieldId(""))
}

versionKey := c.key.WithValueFlag().WithFieldId(core.DATASTORE_DOC_VERSION_FIELD_ID)
objectMarker, err := c.store.Get(ctx, c.key.ToPrimaryDataStoreKey().ToDS())
if err != nil && !errors.Is(err, ds.ErrNotFound) {
return err
}
if bytes.Equal(objectMarker, []byte{base.DeletedObjectMarker}) {
versionKey = versionKey.WithDeletedFlag()
}
err = c.store.Put(ctx, versionKey.ToDS(), []byte(c.schemaVersionKey.SchemaVersionId))
if err != nil {
return err
}

// ensure object marker exists
exists, err := c.store.Has(ctx, c.key.ToPrimaryDataStoreKey().ToDS())
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
SCHEMA_MIGRATION = "/schema/migration" // todo - confirm you are happy with this :)
SEQ = "/seq"
PRIMARY_KEY = "/pk"
DATASTORE_DOC_VERSION_FIELD_ID = "v"
REPLICATOR = "/replicator/id"
P2P_COLLECTION = "/p2p/collection"
)
Expand Down Expand Up @@ -238,6 +239,15 @@ func NewSchemaHistoryKey(schemaId string, previousSchemaVersionId string) Schema
}
}

func NewSchemaHistoryKeyFromString(keyString string) SchemaHistoryKey {
keyString = strings.TrimPrefix(keyString, COLLECTION_SCHEMA_VERSION_HISTORY+"/")
elements := strings.Split(keyString, "/")
return SchemaHistoryKey{
SchemaID: elements[0],
PreviousSchemaVersionId: elements[1],
}
}

func NewSchemaVersionMigrationKey(schemaVersionId string) SchemaVersionMigrationKey {
return SchemaVersionMigrationKey{SourceSchemaVersionId: schemaVersionId}
}
Expand Down
2 changes: 1 addition & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type db struct {
events events.Events

parser core.Parser
lenseRegistry *lens.LensRegistry
lenseRegistry client.LensRegistry

// The maximum number of retries per transaction.
maxTxnRetries immutable.Option[int]
Expand Down
5 changes: 5 additions & 0 deletions db/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ func (df *DocumentFetcher) processKV(kv *core.KeyValue) error {
df.doc.Reset([]byte(kv.Key.DocKey))
}

if kv.Key.FieldId == core.DATASTORE_DOC_VERSION_FIELD_ID {
df.doc.schemaVersionID = string(kv.Value)
return nil
}

// we have to skip the object marker
if bytes.Equal(df.kv.Value, []byte{base.ObjectMarker}) {
return nil
Expand Down
4 changes: 4 additions & 0 deletions db/lens.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/sourcenetwork/defradb/datastore"
)

func (db *db) LensRegistry() client.LensRegistry {
return db.lenseRegistry
}

func (db *db) setMigration(ctx context.Context, txn datastore.Txn, cfg client.LensConfig) error {
// todo - document that source schema version id may not exist locally!
key := core.NewSchemaVersionMigrationKey(cfg.SourceSchema)
Expand Down
26 changes: 18 additions & 8 deletions lens/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package lens
import (
"context"
"encoding/json"
"fmt"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/client/request"
Expand All @@ -24,16 +25,18 @@ import (
// todo - thought - the fetcher stuff would be much nicer as an enumerable I think if not now, open a ticket to change it

type lensedFetcher struct {
source fetcher.Fetcher
lense Lense
col *client.CollectionDescription
source fetcher.Fetcher
lense Lense
col *client.CollectionDescription
registry client.LensRegistry
}

var _ fetcher.Fetcher = (*lensedFetcher)(nil)

func NewFetcher(source fetcher.Fetcher) fetcher.Fetcher {
func NewFetcher(source fetcher.Fetcher, registry client.LensRegistry) fetcher.Fetcher {
return &lensedFetcher{
source: source,
source: source,
registry: registry,
//lense: New(),
}
}
Expand All @@ -44,12 +47,17 @@ func (df *lensedFetcher) Init(
reverse bool,
showDeleted bool,
) error {
df.lense = New(col.Schema.VersionID)
df.col = col
return df.source.Init(col, fields, reverse, showDeleted)
}

func (df *lensedFetcher) Start(ctx context.Context, txn datastore.Txn, spans core.Spans) error {
history, err := getTargetedHistory(ctx, txn, df.registry.Config(), df.col.Schema.SchemaID, df.col.Schema.VersionID)
if err != nil {
return err
}
df.lense = New(df.col.Schema.VersionID, history)

return df.source.Start(ctx, txn, spans)
}

Expand Down Expand Up @@ -80,7 +88,7 @@ func (df *lensedFetcher) FetchNextDoc(
if err != nil {
return nil, core.Doc{}, err
}

fmt.Println(doc.SchemaVersionID)
df.lense.Put(doc.SchemaVersionID, sourceJson)

hasNext, err := df.lense.Next()
Expand All @@ -106,7 +114,9 @@ func (df *lensedFetcher) FetchNextDoc(
}

func (df *lensedFetcher) Close() error {
df.lense.Reset()
if df.lense != nil {
df.lense.Reset()
}
return df.source.Close()
}

Expand Down
146 changes: 136 additions & 10 deletions lens/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@ import (
"context"

"github.com/ipfs/go-datastore/query"
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/immutable"
)

type historyItem struct {
schemaVersionID string
next *historyItem
next immutable.Option[*historyItem]
prior immutable.Option[*historyItem]
}

type targetedHistoryItem struct {
schemaVersionID string
next immutable.Option[*targetedHistoryItem]
prior immutable.Option[*targetedHistoryItem]
targetVector int
}

Expand All @@ -29,22 +38,120 @@ type historyPairing struct {
nextSchemaVersionID string
}

func (l *LensRegistry) getHistory(ctx context.Context, txn datastore.Txn, schemaID string) (map[schemaVersionID]historyItem, error) {
pairings := []historyPairing{}
func getTargetedHistory(
ctx context.Context,
txn datastore.Txn,
lenseConfigs []client.LensConfig,
schemaID string,
targetSchemaVersionID string,
) (map[schemaVersionID]*targetedHistoryItem, error) {
// todo - history should be cached (on registery?) (can be done later?)
history, err := getHistory(ctx, txn, lenseConfigs, schemaID)
if err != nil {
return nil, err
}

result := map[schemaVersionID]*targetedHistoryItem{}

for _, item := range history {
result[item.schemaVersionID] = &targetedHistoryItem{
schemaVersionID: item.schemaVersionID,
}
}

for _, item := range result {
historyItem := history[item.schemaVersionID]
nextHistoryItem := historyItem.next
if !nextHistoryItem.HasValue() {
break
}
nextItem := result[nextHistoryItem.Value().schemaVersionID]
item.next = immutable.Some(nextItem)
nextItem.prior = immutable.Some(item)
}

for _, item := range result {
if item.schemaVersionID == targetSchemaVersionID {
continue
}
if item.targetVector != 0 {
continue
}

distanceTravelled := 0
currentItem := item
wasFound := false
for {
if !currentItem.next.HasValue() {
break
}

currentItem = currentItem.next.Value()
distanceTravelled++
if currentItem.targetVector != 0 {
distanceTravelled += currentItem.targetVector
wasFound = true
break
}
if currentItem.schemaVersionID == targetSchemaVersionID {
wasFound = true
break
}
}

for _, config := range l.lenseConfigs {
pairings = append(pairings, historyPairing{
if !wasFound {
// todo - refactor
// if not found going up, try looking back
for {
if !currentItem.prior.HasValue() {
break
}

currentItem = currentItem.prior.Value()
distanceTravelled--
if currentItem.targetVector != 0 {
distanceTravelled += currentItem.targetVector
wasFound = true
break
}
if currentItem.schemaVersionID == targetSchemaVersionID {
wasFound = true
break
}
}
}

if !wasFound {
// todo - should never happen (if v history is actually linear)
// todo - document that empty migrations required for unknown schema versions with no migration steps
}

item.targetVector = distanceTravelled
}

return result, nil
}

func getHistory(
ctx context.Context,
txn datastore.Txn,
lenseConfigs []client.LensConfig,
schemaID string,
) (map[schemaVersionID]*historyItem, error) {
pairings := map[string]historyPairing{}

for _, config := range lenseConfigs {
pairings[config.SourceSchema] = historyPairing{
schemaVersionID: config.SourceSchema,
nextSchemaVersionID: config.DestinationSchema,
})
}
}

// todo - query local history

prefix := core.NewSchemaHistoryKey(schemaID, "")
q, err := txn.Systemstore().Query(ctx, query.Query{
Prefix: prefix.ToString(),
KeysOnly: true,
Prefix: prefix.ToString(),
})
if err != nil {
return nil, err
Expand All @@ -64,10 +171,29 @@ func (l *LensRegistry) getHistory(ctx context.Context, txn datastore.Txn, schema
return nil, res.Error
}

_ = res.Key // todo
key := core.NewSchemaHistoryKeyFromString(res.Key)
pairings[key.PreviousSchemaVersionId] = historyPairing{
schemaVersionID: key.PreviousSchemaVersionId,
nextSchemaVersionID: string(res.Value),
}
}

history := map[schemaVersionID]historyItem{}
history := map[schemaVersionID]*historyItem{}

for _, pairing := range pairings {
history[pairing.schemaVersionID] = &historyItem{
schemaVersionID: pairing.schemaVersionID,
}
}

for _, pairing := range pairings {
src := history[pairing.schemaVersionID]

if next, hasNext := history[pairing.nextSchemaVersionID]; hasNext {
src.next = immutable.Some(next)
next.prior = immutable.Some(src)
}
}

return history, nil
}
Loading

0 comments on commit fee3881

Please sign in to comment.