Skip to content

Commit

Permalink
refactor: Simplify dag fetcher (#913)
Browse files Browse the repository at this point in the history
* Remove commented out code from commitFetcher

* Remove cid from state

Is no need for this complication

* Remove unused type

* Unfactor func called once

Now we can see what it does, allowing for future improvements

* Remove unused return param

* Remove kvEnd

Statefull prop doesn't really do anything

* Call nextKey at the start of loop

Instead of the end, plus init.  I find this much easier to read.

* Unfactor single use function

Is easier to read IMO, and allows for future refactorings

* Remove iteration state

Simpler and easier to read

* Remove potential spans error

Is no reason for us to have to worry about this
  • Loading branch information
AndrewSisley authored Oct 25, 2022
1 parent 6ccae7f commit 3b53193
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 122 deletions.
138 changes: 19 additions & 119 deletions db/fetcher/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,15 @@ import (
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/errors"
)

// @todo: Generalize all Fetchers into an shared Fetcher utility

type BlockFetcher struct {
}

// HeadFetcher is a utility to incrementally fetch all the MerkleCRDT
// heads of a given doc/field
type HeadFetcher struct {

// Commented because this code is not used yet according to the linter.
// txn datastore.Txn

// key core.Key
// curSpanIndex int

spans core.Spans
cid *cid.Cid
fieldId client.Option[string]

kv *core.HeadKeyValue
kvIter dsq.Results
kvEnd bool
}

func (hf *HeadFetcher) Start(
Expand All @@ -54,10 +38,16 @@ func (hf *HeadFetcher) Start(
spans core.Spans,
fieldId client.Option[string],
) error {
numspans := len(spans.Value)
if numspans == 0 {
return errors.New("headFetcher must have at least one span")
} else if numspans > 1 {
if len(spans.Value) == 0 {
spans = core.NewSpans(
core.NewSpan(
core.DataStoreKey{},
core.DataStoreKey{}.PrefixEnd(),
),
)
}

if len(spans.Value) > 1 {
// if we have multiple spans, we need to sort them by their start position
// so we can do a single iterative sweep
sort.Slice(spans.Value, func(i, j int) bool {
Expand Down Expand Up @@ -86,74 +76,29 @@ func (hf *HeadFetcher) Start(
return err
}

_, err = hf.nextKey()
return err
return nil
}

func (hf *HeadFetcher) nextKey() (bool, error) {
var err error
var done bool
done, hf.kv, err = hf.nextKV()
if err != nil {
return false, err
}

hf.kvEnd = done
if hf.kvEnd {
return true, nil
}
return false, nil
}

func (hf *HeadFetcher) nextKV() (iterDone bool, kv *core.HeadKeyValue, err error) {
func (hf *HeadFetcher) FetchNext() (*cid.Cid, error) {
res, available := hf.kvIter.NextSync()
if !available {
return true, nil, nil
}
if res.Error != nil {
return true, nil, err
}

headStoreKey, err := core.NewHeadStoreKey(res.Key)
if err != nil {
return true, nil, err
return nil, res.Error
}
kv = &core.HeadKeyValue{
Key: headStoreKey,
Value: res.Value,
}
return false, kv, nil
}

func (hf *HeadFetcher) processKV(kv *core.HeadKeyValue) {
hf.cid = &kv.Key.Cid
}

func (hf *HeadFetcher) FetchNext() (*cid.Cid, error) {
if hf.kvEnd {
if !available {
return nil, nil
}

if hf.kv == nil {
return nil, errors.New("failed to get head, fetcher hasn't been initialized or started")
headStoreKey, err := core.NewHeadStoreKey(res.Key)
if err != nil {
return nil, err
}

if hf.fieldId.HasValue() && hf.fieldId.Value() != hf.kv.Key.FieldId {
if hf.fieldId.HasValue() && hf.fieldId.Value() != headStoreKey.FieldId {
// FieldIds do not match, continue to next row
_, err := hf.nextKey()
if err != nil {
return nil, err
}
return hf.FetchNext()
}

hf.processKV(hf.kv)

_, err := hf.nextKey()
if err != nil {
return nil, err
}
return hf.cid, nil
return &headStoreKey.Cid, nil
}

func (hf *HeadFetcher) Close() error {
Expand All @@ -163,48 +108,3 @@ func (hf *HeadFetcher) Close() error {

return hf.kvIter.Close()
}

/*
// List returns the list of current heads plus the max height.
// @todo Document Heads.List function
func (hh *heads) List() ([]cid.Cid, uint64, error) {
q := query.Query{
Prefix: hh.namespace.String(),
KeysOnly: false,
}
results, err := hh.store.Query(q)
if err != nil {
return nil, 0, err
}
defer results.Close()
heads := make([]cid.Cid, 0)
var maxHeight uint64
for r := range results.Next() {
if r.Error != nil {
return nil, 0, errors.Wrap("failed to get next query result ", err)
}
headKey := ds.NewKey(strings.TrimPrefix(r.Key, hh.namespace.String()))
headCid, err := dshelp.DsKeyToCid(headKey)
if err != nil {
return nil, 0, errors.Wrap("failed to get CID from key ", err)
}
height, n := binary.Uvarint(r.Value)
if n <= 0 {
return nil, 0, errors.New("error decoding height")
}
heads = append(heads, headCid)
if height > maxHeight {
maxHeight = height
}
}
sort.Slice(heads, func(i, j int) bool {
ci := heads[i].Bytes()
cj := heads[j].Bytes()
return bytes.Compare(ci, cj) < 0
})
return heads, maxHeight, nil
}
*/
6 changes: 3 additions & 3 deletions planner/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ func (n *dagScanNode) Kind() string {

func (n *dagScanNode) Init() error {
if len(n.spans.Value) == 0 {
key := core.DataStoreKey{}
if n.parsed.DocKey.HasValue() {
key = key.WithDocKey(n.parsed.DocKey.Value())
key := core.DataStoreKey{}.WithDocKey(n.parsed.DocKey.Value())

if n.parsed.FieldName.HasValue() {
field := n.parsed.FieldName.Value()
key = key.WithFieldId(field)
}

n.spans = core.NewSpans(core.NewSpan(key, key.PrefixEnd()))
}
n.spans = core.NewSpans(core.NewSpan(key, key.PrefixEnd()))
}

return n.fetcher.Start(n.p.ctx, n.p.txn, n.spans, n.parsed.FieldName)
Expand Down

0 comments on commit 3b53193

Please sign in to comment.