Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Simplify dag fetcher #913

Merged
merged 11 commits into from
Oct 25, 2022
138 changes: 19 additions & 119 deletions db/fetcher/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,15 @@ import (
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/errors"
)

// @todo: Generalize all Fetchers into an shared Fetcher utility

type BlockFetcher struct {
}

// HeadFetcher is a utility to incrementally fetch all the MerkleCRDT
// heads of a given doc/field
type HeadFetcher struct {

// Commented because this code is not used yet according to the linter.
// txn datastore.Txn

// key core.Key
// curSpanIndex int

spans core.Spans
cid *cid.Cid
fieldId client.Option[string]

kv *core.HeadKeyValue
kvIter dsq.Results
kvEnd bool
}

func (hf *HeadFetcher) Start(
Expand All @@ -54,10 +38,16 @@ func (hf *HeadFetcher) Start(
spans core.Spans,
fieldId client.Option[string],
) error {
numspans := len(spans.Value)
if numspans == 0 {
return errors.New("headFetcher must have at least one span")
} else if numspans > 1 {
if len(spans.Value) == 0 {
spans = core.NewSpans(
core.NewSpan(
core.DataStoreKey{},
core.DataStoreKey{}.PrefixEnd(),
),
)
}

if len(spans.Value) > 1 {
// if we have multiple spans, we need to sort them by their start position
// so we can do a single iterative sweep
sort.Slice(spans.Value, func(i, j int) bool {
Expand Down Expand Up @@ -86,74 +76,29 @@ func (hf *HeadFetcher) Start(
return err
}

_, err = hf.nextKey()
return err
return nil
}

func (hf *HeadFetcher) nextKey() (bool, error) {
var err error
var done bool
done, hf.kv, err = hf.nextKV()
if err != nil {
return false, err
}

hf.kvEnd = done
if hf.kvEnd {
return true, nil
}
return false, nil
}

func (hf *HeadFetcher) nextKV() (iterDone bool, kv *core.HeadKeyValue, err error) {
func (hf *HeadFetcher) FetchNext() (*cid.Cid, error) {
res, available := hf.kvIter.NextSync()
if !available {
return true, nil, nil
}
if res.Error != nil {
return true, nil, err
}

headStoreKey, err := core.NewHeadStoreKey(res.Key)
if err != nil {
return true, nil, err
return nil, res.Error
}
kv = &core.HeadKeyValue{
Key: headStoreKey,
Value: res.Value,
}
return false, kv, nil
}

func (hf *HeadFetcher) processKV(kv *core.HeadKeyValue) {
hf.cid = &kv.Key.Cid
}

func (hf *HeadFetcher) FetchNext() (*cid.Cid, error) {
if hf.kvEnd {
if !available {
return nil, nil
}

if hf.kv == nil {
return nil, errors.New("failed to get head, fetcher hasn't been initialized or started")
headStoreKey, err := core.NewHeadStoreKey(res.Key)
if err != nil {
return nil, err
}

if hf.fieldId.HasValue() && hf.fieldId.Value() != hf.kv.Key.FieldId {
if hf.fieldId.HasValue() && hf.fieldId.Value() != headStoreKey.FieldId {
// FieldIds do not match, continue to next row
_, err := hf.nextKey()
if err != nil {
return nil, err
}
return hf.FetchNext()
}

hf.processKV(hf.kv)

_, err := hf.nextKey()
if err != nil {
return nil, err
}
return hf.cid, nil
return &headStoreKey.Cid, nil
}

func (hf *HeadFetcher) Close() error {
Expand All @@ -163,48 +108,3 @@ func (hf *HeadFetcher) Close() error {

return hf.kvIter.Close()
}

/*
// List returns the list of current heads plus the max height.
// @todo Document Heads.List function
func (hh *heads) List() ([]cid.Cid, uint64, error) {
q := query.Query{
Prefix: hh.namespace.String(),
KeysOnly: false,
}

results, err := hh.store.Query(q)
if err != nil {
return nil, 0, err
}
defer results.Close()

heads := make([]cid.Cid, 0)
var maxHeight uint64
for r := range results.Next() {
if r.Error != nil {
return nil, 0, errors.Wrap("failed to get next query result ", err)
}
headKey := ds.NewKey(strings.TrimPrefix(r.Key, hh.namespace.String()))
headCid, err := dshelp.DsKeyToCid(headKey)
if err != nil {
return nil, 0, errors.Wrap("failed to get CID from key ", err)
}
height, n := binary.Uvarint(r.Value)
if n <= 0 {
return nil, 0, errors.New("error decoding height")
}
heads = append(heads, headCid)
if height > maxHeight {
maxHeight = height
}
}
sort.Slice(heads, func(i, j int) bool {
ci := heads[i].Bytes()
cj := heads[j].Bytes()
return bytes.Compare(ci, cj) < 0
})

return heads, maxHeight, nil
}
*/
6 changes: 3 additions & 3 deletions planner/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ func (n *dagScanNode) Kind() string {

func (n *dagScanNode) Init() error {
if len(n.spans.Value) == 0 {
key := core.DataStoreKey{}
if n.parsed.DocKey.HasValue() {
key = key.WithDocKey(n.parsed.DocKey.Value())
key := core.DataStoreKey{}.WithDocKey(n.parsed.DocKey.Value())

if n.parsed.FieldName.HasValue() {
field := n.parsed.FieldName.Value()
key = key.WithFieldId(field)
}

n.spans = core.NewSpans(core.NewSpan(key, key.PrefixEnd()))
}
n.spans = core.NewSpans(core.NewSpan(key, key.PrefixEnd()))
}

return n.fetcher.Start(n.p.ctx, n.p.txn, n.spans, n.parsed.FieldName)
Expand Down