Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: CRDT merge direction #2016

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,11 @@ test\:coverage:
@$(MAKE) deps:lens
@$(MAKE) clean:coverage
mkdir $(COVERAGE_DIRECTORY)
ifeq ($(path),)
gotestsum --format testname -- ./... $(TEST_FLAGS) $(COVERAGE_FLAGS)
else
gotestsum --format testname -- $(path) $(TEST_FLAGS) $(COVERAGE_FLAGS)
endif
go tool covdata textfmt -i=$(COVERAGE_DIRECTORY) -o $(COVERAGE_FILE)

.PHONY: test\:coverage-func
Expand All @@ -266,8 +270,10 @@ test\:coverage-func:

.PHONY: test\:coverage-html
test\:coverage-html:
@$(MAKE) test:coverage
@$(MAKE) test:coverage path=$(path)
go tool cover -html=$(COVERAGE_FILE)
@$(MAKE) clean:coverage


.PHONY: test\:changes
test\:changes:
Expand Down
3 changes: 1 addition & 2 deletions core/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package core
import (
"context"

cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)

Expand All @@ -24,5 +23,5 @@ type MerkleClock interface {
ctx context.Context,
delta Delta,
) (ipld.Node, error) // possibly change to AddDeltaNode?
ProcessNode(context.Context, NodeGetter, Delta, ipld.Node) ([]cid.Cid, error)
ProcessNode(context.Context, Delta, ipld.Node) error
}
14 changes: 0 additions & 14 deletions core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,20 +196,6 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta) error {
}

func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key core.DataStoreKey) error {
val, err := c.store.Get(ctx, key.ToDS())
if err != nil && !errors.Is(err, ds.ErrNotFound) {
return err
}
if !errors.Is(err, ds.ErrNotFound) {
err = c.store.Put(ctx, c.key.WithDeletedFlag().ToDS(), val)
if err != nil {
return err
}
err = c.store.Delete(ctx, key.ToDS())
if err != nil {
return err
}
}
q := query.Query{
Prefix: key.ToString(),
}
Expand Down
2 changes: 1 addition & 1 deletion db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (vf *VersionedFetcher) processNode(
return err
}

_, err = mcrdt.Clock().ProcessNode(vf.ctx, nil, delta, nd)
err = mcrdt.Clock().ProcessNode(vf.ctx, delta, nd)
return err
}

Expand Down
23 changes: 8 additions & 15 deletions merkle/clock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,8 @@ func (mc *MerkleClock) AddDAGNode(
}

// apply the new node and merge the delta with state
// @todo Remove NodeGetter as a parameter, and move it to a MerkleClock field
_, err = mc.ProcessNode(
err = mc.ProcessNode(
ctx,
&CrdtNodeGetter{DeltaExtractor: mc.crdt.DeltaDecode},
delta,
nd,
)
Expand All @@ -117,17 +115,16 @@ func (mc *MerkleClock) AddDAGNode(
// ProcessNode processes an already merged delta into a CRDT by adding it to the state.
func (mc *MerkleClock) ProcessNode(
ctx context.Context,
ng core.NodeGetter,
delta core.Delta,
node ipld.Node,
) ([]cid.Cid, error) {
) error {
nodeCid := node.Cid()
priority := delta.GetPriority()

log.Debug(ctx, "Running ProcessNode", logging.NewKV("CID", nodeCid))
err := mc.crdt.Merge(ctx, delta)
if err != nil {
return nil, NewErrMergingDelta(nodeCid, err)
return NewErrMergingDelta(nodeCid, err)
}

links := node.Links()
Expand All @@ -145,18 +142,16 @@ func (mc *MerkleClock) ProcessNode(
log.Debug(ctx, "No heads found")
err := mc.headset.Write(ctx, nodeCid, priority)
if err != nil {
return nil, NewErrAddingHead(nodeCid, err)
return NewErrAddingHead(nodeCid, err)
}
}

children := []cid.Cid{}

for _, l := range links {
linkCid := l.Cid
log.Debug(ctx, "Scanning for replacement heads", logging.NewKV("Child", linkCid))
isHead, err := mc.headset.IsHead(ctx, linkCid)
if err != nil {
return nil, NewErrCheckingHead(linkCid, err)
return NewErrCheckingHead(linkCid, err)
}

if isHead {
Expand All @@ -165,15 +160,15 @@ func (mc *MerkleClock) ProcessNode(
// of current branch
err = mc.headset.Replace(ctx, linkCid, nodeCid, priority)
if err != nil {
return nil, NewErrReplacingHead(linkCid, nodeCid, err)
return NewErrReplacingHead(linkCid, nodeCid, err)
}

continue
}

known, err := mc.dagstore.Has(ctx, linkCid)
if err != nil {
return nil, NewErrCouldNotFindBlock(linkCid, err)
return NewErrCouldNotFindBlock(linkCid, err)
}
if known {
// we reached a non-head node in the known tree.
Expand All @@ -192,11 +187,9 @@ func (mc *MerkleClock) ProcessNode(
}
continue
}

children = append(children, linkCid)
}

return children, nil
return nil
}

// Heads returns the current heads of the MerkleClock.
Expand Down
81 changes: 26 additions & 55 deletions net/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/logging"
)

Expand Down Expand Up @@ -50,17 +47,10 @@ type SessionDAGSyncer interface {
}

type dagJob struct {
session *sync.WaitGroup // A waitgroup to wait for all related jobs to conclude
nodeGetter ipld.NodeGetter // a node getter to use
node ipld.Node // the current ipld Node

collection client.Collection // collection our document belongs to
dsKey core.DataStoreKey // datastore key of our document
fieldName string // field of the subgraph our node belongs to

// Transaction common to a pushlog event. It is used to pass it along to processLog
// and handleChildBlocks within the dagWorker.
txn datastore.Txn
session *sync.WaitGroup // A waitgroup to wait for all related jobs to conclude
bp *blockProcessor // the block processor to use
cid cid.Cid // the cid of the block to fetch from the P2P network
isComposite bool // whether this is a composite block

// OLD FIELDS
// root cid.Cid // the root of the branch we are walking down
Expand All @@ -87,13 +77,13 @@ func (p *Peer) sendJobWorker() {
return

case newJob := <-p.sendJobs:
jobs, ok := docWorkerQueue[newJob.dsKey.DocKey]
jobs, ok := docWorkerQueue[newJob.bp.dsKey.DocKey]
if !ok {
jobs = make(chan *dagJob, numWorkers)
for i := 0; i < numWorkers; i++ {
go p.dagWorker(jobs)
}
docWorkerQueue[newJob.dsKey.DocKey] = jobs
docWorkerQueue[newJob.bp.dsKey.DocKey] = jobs
}
jobs <- newJob

Expand All @@ -113,8 +103,8 @@ func (p *Peer) dagWorker(jobs chan *dagJob) {
log.Debug(
p.ctx,
"Starting new job from DAG queue",
logging.NewKV("Datastore Key", job.dsKey),
logging.NewKV("CID", job.node.Cid()),
logging.NewKV("Datastore Key", job.bp.dsKey),
logging.NewKV("CID", job.cid),
)

select {
Expand All @@ -125,44 +115,25 @@ func (p *Peer) dagWorker(jobs chan *dagJob) {
default:
}

children, err := p.processLog(
p.ctx,
job.txn,
job.collection,
job.dsKey,
job.fieldName,
job.node,
job.nodeGetter,
true,
)
if err != nil {
log.ErrorE(
p.ctx,
"Error processing log",
err,
logging.NewKV("Datastore key", job.dsKey),
logging.NewKV("CID", job.node.Cid()),
)
job.session.Done()
continue
}

if len(children) == 0 {
job.session.Done()
continue
}

go func(j *dagJob) {
p.handleChildBlocks(
j.session,
j.txn,
j.collection,
j.dsKey,
j.fieldName,
j.node,
children,
j.nodeGetter,
)
if j.bp.getter != nil && j.cid.Defined() {
cNode, err := j.bp.getter.Get(p.ctx, j.cid)
if err != nil {
log.ErrorE(p.ctx, "Failed to get node", err, logging.NewKV("CID", j.cid))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: Please define these errors (and others in other files) in an errors.go file

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not returned errors. It's a log message. We don't predefine our log messages at all currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our log messages are public, just like errors. Most of the time the error messages will be exposed via logs (e.g. CLI/HTTP), the reasons for housing them in a dedicated and standardised file are exactly the same.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although there is no value in defining them in the errors.go file. It's not an error that can be handled or compared. It's just a message in a log file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time for a log.go file :,)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with a solution like that. I'll leave it as is for now and create a separate PR to take care of log messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a message in a log file.

nitpick: It is an error message in a log file, and indistinguishable to our users from any other 'kind' of error.

I'll leave it as is for now and create a separate PR to take care of log messages

I'm fine with that, but I do think they should live in an errors.go file, because they are errors.

j.session.Done()
return
}
err = j.bp.processRemoteBlock(
p.ctx,
j.session,
cNode,
j.isComposite,
)
if err != nil {
log.ErrorE(p.ctx, "Failed to process remote block", err, logging.NewKV("CID", j.cid))
}
}
p.queuedChildren.Remove(j.cid)
j.session.Done()
}(job)
}
Expand Down
Loading