diff --git a/Makefile b/Makefile index 7268834d5a..5dddc2872e 100644 --- a/Makefile +++ b/Makefile @@ -256,7 +256,11 @@ test\:coverage: @$(MAKE) deps:lens @$(MAKE) clean:coverage mkdir $(COVERAGE_DIRECTORY) +ifeq ($(path),) gotestsum --format testname -- ./... $(TEST_FLAGS) $(COVERAGE_FLAGS) +else + gotestsum --format testname -- $(path) $(TEST_FLAGS) $(COVERAGE_FLAGS) +endif go tool covdata textfmt -i=$(COVERAGE_DIRECTORY) -o $(COVERAGE_FILE) .PHONY: test\:coverage-func @@ -266,8 +270,10 @@ test\:coverage-func: .PHONY: test\:coverage-html test\:coverage-html: - @$(MAKE) test:coverage + @$(MAKE) test:coverage path=$(path) go tool cover -html=$(COVERAGE_FILE) + @$(MAKE) clean:coverage + .PHONY: test\:changes test\:changes: diff --git a/core/clock.go b/core/clock.go index 622a36233c..e7b8c7f1f2 100644 --- a/core/clock.go +++ b/core/clock.go @@ -13,7 +13,6 @@ package core import ( "context" - cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" ) @@ -24,5 +23,5 @@ type MerkleClock interface { ctx context.Context, delta Delta, ) (ipld.Node, error) // possibly change to AddDeltaNode? - ProcessNode(context.Context, NodeGetter, Delta, ipld.Node) ([]cid.Cid, error) + ProcessNode(context.Context, Delta, ipld.Node) error } diff --git a/core/crdt/composite.go b/core/crdt/composite.go index c999f66666..68f7824329 100644 --- a/core/crdt/composite.go +++ b/core/crdt/composite.go @@ -196,20 +196,6 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta) error { } func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key core.DataStoreKey) error { - val, err := c.store.Get(ctx, key.ToDS()) - if err != nil && !errors.Is(err, ds.ErrNotFound) { - return err - } - if !errors.Is(err, ds.ErrNotFound) { - err = c.store.Put(ctx, c.key.WithDeletedFlag().ToDS(), val) - if err != nil { - return err - } - err = c.store.Delete(ctx, key.ToDS()) - if err != nil { - return err - } - } q := query.Query{ Prefix: key.ToString(), } diff --git a/db/fetcher/versioned.go b/db/fetcher/versioned.go index 2b534c831b..4ab8ef54a7 100644 --- a/db/fetcher/versioned.go +++ b/db/fetcher/versioned.go @@ -405,7 +405,7 @@ func (vf *VersionedFetcher) processNode( return err } - _, err = mcrdt.Clock().ProcessNode(vf.ctx, nil, delta, nd) + err = mcrdt.Clock().ProcessNode(vf.ctx, delta, nd) return err } diff --git a/merkle/clock/clock.go b/merkle/clock/clock.go index 95151e2e76..2bdc9fda93 100644 --- a/merkle/clock/clock.go +++ b/merkle/clock/clock.go @@ -103,10 +103,8 @@ func (mc *MerkleClock) AddDAGNode( } // apply the new node and merge the delta with state - // @todo Remove NodeGetter as a parameter, and move it to a MerkleClock field - _, err = mc.ProcessNode( + err = mc.ProcessNode( ctx, - &CrdtNodeGetter{DeltaExtractor: mc.crdt.DeltaDecode}, delta, nd, ) @@ -117,17 +115,16 @@ func (mc *MerkleClock) AddDAGNode( // ProcessNode processes an already merged delta into a CRDT by adding it to the state. func (mc *MerkleClock) ProcessNode( ctx context.Context, - ng core.NodeGetter, delta core.Delta, node ipld.Node, -) ([]cid.Cid, error) { +) error { nodeCid := node.Cid() priority := delta.GetPriority() log.Debug(ctx, "Running ProcessNode", logging.NewKV("CID", nodeCid)) err := mc.crdt.Merge(ctx, delta) if err != nil { - return nil, NewErrMergingDelta(nodeCid, err) + return NewErrMergingDelta(nodeCid, err) } links := node.Links() @@ -145,18 +142,16 @@ func (mc *MerkleClock) ProcessNode( log.Debug(ctx, "No heads found") err := mc.headset.Write(ctx, nodeCid, priority) if err != nil { - return nil, NewErrAddingHead(nodeCid, err) + return NewErrAddingHead(nodeCid, err) } } - children := []cid.Cid{} - for _, l := range links { linkCid := l.Cid log.Debug(ctx, "Scanning for replacement heads", logging.NewKV("Child", linkCid)) isHead, err := mc.headset.IsHead(ctx, linkCid) if err != nil { - return nil, NewErrCheckingHead(linkCid, err) + return NewErrCheckingHead(linkCid, err) } if isHead { @@ -165,7 +160,7 @@ func (mc *MerkleClock) ProcessNode( // of current branch err = mc.headset.Replace(ctx, linkCid, nodeCid, priority) if err != nil { - return nil, NewErrReplacingHead(linkCid, nodeCid, err) + return NewErrReplacingHead(linkCid, nodeCid, err) } continue @@ -173,7 +168,7 @@ func (mc *MerkleClock) ProcessNode( known, err := mc.dagstore.Has(ctx, linkCid) if err != nil { - return nil, NewErrCouldNotFindBlock(linkCid, err) + return NewErrCouldNotFindBlock(linkCid, err) } if known { // we reached a non-head node in the known tree. @@ -192,11 +187,9 @@ func (mc *MerkleClock) ProcessNode( } continue } - - children = append(children, linkCid) } - return children, nil + return nil } // Heads returns the current heads of the MerkleClock. diff --git a/net/dag.go b/net/dag.go index 2d49790f90..1760864db4 100644 --- a/net/dag.go +++ b/net/dag.go @@ -20,9 +20,6 @@ import ( "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" - "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/core" - "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/logging" ) @@ -50,17 +47,10 @@ type SessionDAGSyncer interface { } type dagJob struct { - session *sync.WaitGroup // A waitgroup to wait for all related jobs to conclude - nodeGetter ipld.NodeGetter // a node getter to use - node ipld.Node // the current ipld Node - - collection client.Collection // collection our document belongs to - dsKey core.DataStoreKey // datastore key of our document - fieldName string // field of the subgraph our node belongs to - - // Transaction common to a pushlog event. It is used to pass it along to processLog - // and handleChildBlocks within the dagWorker. - txn datastore.Txn + session *sync.WaitGroup // A waitgroup to wait for all related jobs to conclude + bp *blockProcessor // the block processor to use + cid cid.Cid // the cid of the block to fetch from the P2P network + isComposite bool // whether this is a composite block // OLD FIELDS // root cid.Cid // the root of the branch we are walking down @@ -87,13 +77,13 @@ func (p *Peer) sendJobWorker() { return case newJob := <-p.sendJobs: - jobs, ok := docWorkerQueue[newJob.dsKey.DocKey] + jobs, ok := docWorkerQueue[newJob.bp.dsKey.DocKey] if !ok { jobs = make(chan *dagJob, numWorkers) for i := 0; i < numWorkers; i++ { go p.dagWorker(jobs) } - docWorkerQueue[newJob.dsKey.DocKey] = jobs + docWorkerQueue[newJob.bp.dsKey.DocKey] = jobs } jobs <- newJob @@ -113,8 +103,8 @@ func (p *Peer) dagWorker(jobs chan *dagJob) { log.Debug( p.ctx, "Starting new job from DAG queue", - logging.NewKV("Datastore Key", job.dsKey), - logging.NewKV("CID", job.node.Cid()), + logging.NewKV("Datastore Key", job.bp.dsKey), + logging.NewKV("CID", job.cid), ) select { @@ -125,44 +115,25 @@ func (p *Peer) dagWorker(jobs chan *dagJob) { default: } - children, err := p.processLog( - p.ctx, - job.txn, - job.collection, - job.dsKey, - job.fieldName, - job.node, - job.nodeGetter, - true, - ) - if err != nil { - log.ErrorE( - p.ctx, - "Error processing log", - err, - logging.NewKV("Datastore key", job.dsKey), - logging.NewKV("CID", job.node.Cid()), - ) - job.session.Done() - continue - } - - if len(children) == 0 { - job.session.Done() - continue - } - go func(j *dagJob) { - p.handleChildBlocks( - j.session, - j.txn, - j.collection, - j.dsKey, - j.fieldName, - j.node, - children, - j.nodeGetter, - ) + if j.bp.getter != nil && j.cid.Defined() { + cNode, err := j.bp.getter.Get(p.ctx, j.cid) + if err != nil { + log.ErrorE(p.ctx, "Failed to get node", err, logging.NewKV("CID", j.cid)) + j.session.Done() + return + } + err = j.bp.processRemoteBlock( + p.ctx, + j.session, + cNode, + j.isComposite, + ) + if err != nil { + log.ErrorE(p.ctx, "Failed to process remote block", err, logging.NewKV("CID", j.cid)) + } + } + p.queuedChildren.Remove(j.cid) j.session.Done() }(job) } diff --git a/net/dag_test.go b/net/dag_test.go index 124c464db4..6f0145b0ae 100644 --- a/net/dag_test.go +++ b/net/dag_test.go @@ -18,14 +18,13 @@ import ( dag "github.com/ipfs/boxo/ipld/merkledag" "github.com/ipfs/go-cid" - format "github.com/ipfs/go-ipld-format" ipld "github.com/ipfs/go-ipld-format" mh "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/core" - "github.com/sourcenetwork/defradb/core/crdt" + "github.com/sourcenetwork/defradb/merkle/clock" netutils "github.com/sourcenetwork/defradb/net/utils" ) @@ -47,49 +46,6 @@ func TestSendJobWorker_ExitOnContextClose_NoError(t *testing.T) { } } -func TestSendJobWorker_WithNewJobWithClosePriorToProcessing_NoError(t *testing.T) { - ctx := context.Background() - db, n := newTestNode(ctx, t) - done := make(chan struct{}) - go func() { - n.sendJobWorker() - close(done) - }() - _, err := db.AddSchema(ctx, `type User { - name: String - age: Int - }`) - require.NoError(t, err) - - col, err := db.GetCollectionByName(ctx, "User") - require.NoError(t, err) - - doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`)) - require.NoError(t, err) - dsKey := core.DataStoreKeyFromDocKey(doc.Key()) - - txn, err := db.NewTxn(ctx, false) - require.NoError(t, err) - - wg := sync.WaitGroup{} - wg.Add(1) - - n.sendJobs <- &dagJob{ - session: &wg, - node: &EmptyNode{}, - collection: col, - dsKey: dsKey, - txn: txn, - } - - n.Close() - select { - case <-done: - case <-time.After(timeout): - t.Error("failed to close sendJobWorker") - } -} - func TestSendJobWorker_WithNewJob_NoError(t *testing.T) { ctx := context.Background() db, n := newTestNode(ctx, t) @@ -104,9 +60,6 @@ func TestSendJobWorker_WithNewJob_NoError(t *testing.T) { }`) require.NoError(t, err) - col, err := db.GetCollectionByName(ctx, "User") - require.NoError(t, err) - doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`)) require.NoError(t, err) dsKey := core.DataStoreKeyFromDocKey(doc.Key()) @@ -118,11 +71,11 @@ func TestSendJobWorker_WithNewJob_NoError(t *testing.T) { wg.Add(1) n.sendJobs <- &dagJob{ - session: &wg, - node: &EmptyNode{}, - collection: col, - dsKey: dsKey, - txn: txn, + session: &wg, + bp: &blockProcessor{ + dsKey: dsKey, + txn: txn, + }, } // Give the jobworker time to process the job. time.Sleep(100 * time.Microsecond) @@ -148,9 +101,6 @@ func TestSendJobWorker_WithCloseJob_NoError(t *testing.T) { }`) require.NoError(t, err) - col, err := db.GetCollectionByName(ctx, "User") - require.NoError(t, err) - doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`)) require.NoError(t, err) dsKey := core.DataStoreKeyFromDocKey(doc.Key()) @@ -162,11 +112,11 @@ func TestSendJobWorker_WithCloseJob_NoError(t *testing.T) { wg.Add(1) n.sendJobs <- &dagJob{ - session: &wg, - node: &EmptyNode{}, - collection: col, - dsKey: dsKey, - txn: txn, + session: &wg, + bp: &blockProcessor{ + dsKey: dsKey, + txn: txn, + }, } n.closeJob <- dsKey.DocKey @@ -179,7 +129,7 @@ func TestSendJobWorker_WithCloseJob_NoError(t *testing.T) { } } -func TestSendJobWorker_WithPeerAndNoChildren_NoError(t *testing.T) { +func TestSendJobWorker_WithPeer_NoError(t *testing.T) { ctx := context.Background() db1, n1 := newTestNode(ctx, t) db2, n2 := newTestNode(ctx, t) @@ -188,6 +138,10 @@ func TestSendJobWorker_WithPeerAndNoChildren_NoError(t *testing.T) { require.NoError(t, err) n2.Bootstrap(addrs) + err = n1.WaitForPeerConnectionEvent(n2.PeerID()) + require.NoError(t, err) + err = n2.WaitForPeerConnectionEvent(n1.PeerID()) + require.NoError(t, err) done := make(chan struct{}) go func() { n2.sendJobWorker() @@ -215,132 +169,45 @@ func TestSendJobWorker_WithPeerAndNoChildren_NoError(t *testing.T) { err = col.Create(ctx, doc) require.NoError(t, err) - txn, err := db2.NewTxn(ctx, false) + txn1, _ := db1.NewTxn(ctx, false) + heads, _, err := clock.NewHeadSet(txn1.Headstore(), dsKey.ToHeadStoreKey().WithFieldId(core.COMPOSITE_NAMESPACE)).List(ctx) + require.NoError(t, err) + txn1.Discard(ctx) + + txn2, err := db2.NewTxn(ctx, false) require.NoError(t, err) wg := sync.WaitGroup{} wg.Add(1) - delta := &crdt.CompositeDAGDelta{ - SchemaVersionID: col.Schema().VersionID, - Priority: 1, - DocKey: doc.Key().Bytes(), - } - - node, err := makeNode(delta, []cid.Cid{}) - require.NoError(t, err) - - var getter format.NodeGetter = n2.Peer.newDAGSyncerTxn(txn) + var getter ipld.NodeGetter = n2.Peer.newDAGSyncerTxn(txn2) if sessionMaker, ok := getter.(SessionDAGSyncer); ok { log.Debug(ctx, "Upgrading DAGSyncer with a session") getter = sessionMaker.Session(ctx) } n2.sendJobs <- &dagJob{ - session: &wg, - nodeGetter: getter, - node: node, - collection: col, - dsKey: dsKey, - txn: txn, + bp: newBlockProcessor(n2.Peer, txn2, col, dsKey, getter), + session: &wg, + cid: heads[0], + isComposite: true, } - // Give the jobworker time to process the job. - time.Sleep(100 * time.Microsecond) - n1.Close() - n2.Close() - select { - case <-done: - case <-time.After(timeout): - t.Error("failed to close sendJobWorker") - } -} - -func TestSendJobWorker_WithPeerAndChildren_NoError(t *testing.T) { - ctx := context.Background() - db1, n1 := newTestNode(ctx, t) - db2, n2 := newTestNode(ctx, t) - - addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()}) - require.NoError(t, err) - n2.Bootstrap(addrs) - - done := make(chan struct{}) - go func() { - n2.sendJobWorker() - close(done) - }() - - _, err = db1.AddSchema(ctx, `type User { - name: String - age: Int - }`) - require.NoError(t, err) - _, err = db2.AddSchema(ctx, `type User { - name: String - age: Int - }`) - require.NoError(t, err) + wg.Wait() - col, err := db1.GetCollectionByName(ctx, "User") + err = txn2.Commit(ctx) require.NoError(t, err) - doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`)) - require.NoError(t, err) - dsKey := core.DataStoreKeyFromDocKey(doc.Key()) - - err = col.Create(ctx, doc) + block, err := n1.db.Blockstore().Get(ctx, heads[0]) require.NoError(t, err) - - txn, err := db2.NewTxn(ctx, false) + nd, err := dag.DecodeProtobufBlock(block) require.NoError(t, err) - wg := sync.WaitGroup{} - wg.Add(1) - - links := []core.DAGLink{} - for k := range doc.Fields() { - delta := &crdt.LWWRegDelta{ - SchemaVersionID: col.Schema().VersionID, - Priority: 1, - DocKey: doc.Key().Bytes(), - FieldName: k, - } - - node, err := makeNode(delta, []cid.Cid{}) + for _, link := range nd.Links() { + exists, err := n2.db.Blockstore().Has(ctx, link.Cid) require.NoError(t, err) - - links = append(links, core.DAGLink{ - Name: k, - Cid: node.Cid(), - }) + require.True(t, exists) } - delta := &crdt.CompositeDAGDelta{ - SchemaVersionID: col.Schema().VersionID, - Priority: 1, - DocKey: doc.Key().Bytes(), - SubDAGs: links, - } - - node, err := makeNode(delta, []cid.Cid{}) - require.NoError(t, err) - - var getter format.NodeGetter = n2.Peer.newDAGSyncerTxn(txn) - if sessionMaker, ok := getter.(SessionDAGSyncer); ok { - log.Debug(ctx, "Upgrading DAGSyncer with a session") - getter = sessionMaker.Session(ctx) - } - - n2.sendJobs <- &dagJob{ - session: &wg, - nodeGetter: getter, - node: node, - collection: col, - dsKey: dsKey, - txn: txn, - } - // Give the jobworker time to process the job. - time.Sleep(100 * time.Microsecond) n1.Close() n2.Close() select { diff --git a/net/process.go b/net/process.go index e3d958c466..85748090ff 100644 --- a/net/process.go +++ b/net/process.go @@ -13,6 +13,7 @@ package net import ( + "container/list" "context" "fmt" "sync" @@ -29,57 +30,95 @@ import ( "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/events" "github.com/sourcenetwork/defradb/logging" - "github.com/sourcenetwork/defradb/merkle/clock" "github.com/sourcenetwork/defradb/merkle/crdt" ) -// processNode is a general utility for processing various kinds -// of CRDT blocks -func (p *Peer) processLog( - ctx context.Context, +type blockProcessor struct { + *Peer + txn datastore.Txn + col client.Collection + dsKey core.DataStoreKey + getter ipld.NodeGetter + // List of composite blocks to eventually merge + composites *list.List +} + +func newBlockProcessor( + p *Peer, txn datastore.Txn, col client.Collection, dsKey core.DataStoreKey, - field string, - nd ipld.Node, getter ipld.NodeGetter, - removeChildren bool, -) ([]cid.Cid, error) { - log.Debug(ctx, "Running processLog") +) *blockProcessor { + return &blockProcessor{ + Peer: p, + composites: list.New(), + txn: txn, + col: col, + dsKey: dsKey, + getter: getter, + } +} - crdt, err := initCRDTForType(ctx, txn, col, dsKey, field) - if err != nil { - return nil, err +// mergeBlock runs trough the list of composite blocks and sends them for processing. +func (bp *blockProcessor) mergeBlocks(ctx context.Context) { + for e := bp.composites.Front(); e != nil; e = e.Next() { + nd := e.Value.(ipld.Node) + err := bp.processBlock(ctx, nd, "") + if err != nil { + log.ErrorE( + ctx, + "Failed to process block", + err, + logging.NewKV("DocKey", bp.dsKey.DocKey), + logging.NewKV("CID", nd.Cid()), + ) + } } +} - delta, err := crdt.DeltaDecode(nd) +// processBlock merges the block and its children to the datastore and sets the head accordingly. +func (bp *blockProcessor) processBlock(ctx context.Context, nd ipld.Node, field string) error { + crdt, err := initCRDTForType(ctx, bp.txn, bp.col, bp.dsKey, field) if err != nil { - return nil, errors.Wrap("failed to decode delta object", err) + return err } - - log.Debug( - ctx, - "Processing PushLog request", - logging.NewKV("Datastore key", dsKey), - logging.NewKV("CID", nd.Cid()), - ) - - if err := txn.DAGstore().Put(ctx, nd); err != nil { - return nil, err + delta, err := crdt.DeltaDecode(nd) + if err != nil { + return errors.Wrap("failed to decode delta object", err) } - ng := p.createNodeGetter(crdt, getter) - cids, err := crdt.Clock().ProcessNode(ctx, ng, delta, nd) + err = crdt.Clock().ProcessNode(ctx, delta, nd) if err != nil { - return nil, err + return err } - if removeChildren { - // mark this obj as done - p.queuedChildren.Remove(nd.Cid()) + for _, link := range nd.Links() { + if link.Name == core.HEAD { + continue + } + + block, err := bp.txn.DAGstore().Get(ctx, link.Cid) + if err != nil { + return err + } + nd, err := dag.DecodeProtobufBlock(block) + if err != nil { + return err + } + + if err := bp.processBlock(ctx, nd, link.Name); err != nil { + log.ErrorE( + ctx, + "Failed to process block", + err, + logging.NewKV("DocKey", bp.dsKey.DocKey), + logging.NewKV("CID", nd.Cid()), + ) + } } - return cids, nil + return nil } func initCRDTForType( @@ -129,88 +168,72 @@ func decodeBlockBuffer(buf []byte, cid cid.Cid) (ipld.Node, error) { return ipld.Decode(blk, dag.DecodeProtobufBlock) } -func (p *Peer) createNodeGetter( - crdt crdt.MerkleCRDT, - getter ipld.NodeGetter, -) *clock.CrdtNodeGetter { - return &clock.CrdtNodeGetter{ - NodeGetter: getter, - DeltaExtractor: crdt.DeltaDecode, +// processRemoteBlock stores the block in the DAG store and initiates a sync of the block's children. +func (bp *blockProcessor) processRemoteBlock( + ctx context.Context, + session *sync.WaitGroup, + nd ipld.Node, + isComposite bool, +) error { + log.Debug(ctx, "Running processLog") + + if err := bp.txn.DAGstore().Put(ctx, nd); err != nil { + return err + } + + if isComposite { + bp.composites.PushFront(nd) } + + bp.handleChildBlocks(ctx, session, nd, isComposite) + + return nil } -func (p *Peer) handleChildBlocks( +func (bp *blockProcessor) handleChildBlocks( + ctx context.Context, session *sync.WaitGroup, - txn datastore.Txn, - col client.Collection, - dsKey core.DataStoreKey, - field string, nd ipld.Node, - children []cid.Cid, - getter ipld.NodeGetter, + isComposite bool, ) { - if len(children) == 0 { + if len(nd.Links()) == 0 { return } - ctx, cancel := context.WithTimeout(p.ctx, DAGSyncTimeout) + ctx, cancel := context.WithTimeout(ctx, DAGSyncTimeout) defer cancel() - for _, c := range children { - if !p.queuedChildren.Visit(c) { // reserve for processing + for _, link := range nd.Links() { + if !bp.queuedChildren.Visit(link.Cid) { // reserve for processing continue } - var fieldName string - // loop over our children to get the corresponding field names from the DAG - for _, l := range nd.Links() { - if c == l.Cid { - if l.Name != core.HEAD { - fieldName = l.Name - } - } - } - - // heads of subfields are still subfields, not composites - if fieldName == "" && field != "" { - fieldName = field - } - - // get object - cNode, err := getter.Get(ctx, c) + exist, err := bp.txn.DAGstore().Has(ctx, link.Cid) if err != nil { - log.ErrorE(ctx, "Failed to get node", err, logging.NewKV("CID", c)) + log.Error( + ctx, + "Failed to check for existing block", + logging.NewKV("CID", link.Cid), + logging.NewKV("ERROR", err), + ) + } + if exist { + log.Debug(ctx, "Already have block locally, skipping.", logging.NewKV("CID", link.Cid)) continue } - log.Debug( - ctx, - "Submitting new job to DAG queue", - logging.NewKV("Collection", col.Name()), - logging.NewKV("Datastore key", dsKey), - logging.NewKV("Field", fieldName), - logging.NewKV("CID", cNode.Cid())) - session.Add(1) job := &dagJob{ - collection: col, - dsKey: dsKey, - fieldName: fieldName, - session: session, - nodeGetter: getter, - node: cNode, - txn: txn, + session: session, + cid: link.Cid, + isComposite: isComposite && link.Name == core.HEAD, + bp: bp, } select { - case p.sendJobs <- job: - case <-p.ctx.Done(): + case bp.sendJobs <- job: + case <-bp.ctx.Done(): return // jump out } } - - // Clear up any children we failed to get from queued children - // for _, child := range children { - // p.queuedChildren.Remove(child) - // } } diff --git a/net/server.go b/net/server.go index e3852c0291..bb1dcadd2f 100644 --- a/net/server.go +++ b/net/server.go @@ -247,7 +247,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL } schemaRoot := string(req.Body.SchemaRoot) - docKey := core.DataStoreKeyFromDocKey(dockey) + dsKey := core.DataStoreKeyFromDocKey(dockey) var txnErr error for retry := 0; retry < s.peer.db.MaxTxnRetries(); retry++ { @@ -284,33 +284,25 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL return nil, errors.Wrap("failed to decode block to ipld.Node", err) } - cids, err := s.peer.processLog(ctx, txn, col, docKey, "", nd, getter, false) + var session sync.WaitGroup + bp := newBlockProcessor(s.peer, txn, col, dsKey, getter) + err = bp.processRemoteBlock(ctx, &session, nd, true) if err != nil { log.ErrorE( ctx, - "Failed to process PushLog node", + "Failed to process remote block", err, - logging.NewKV("DocKey", docKey), + logging.NewKV("DocKey", dsKey.DocKey), logging.NewKV("CID", cid), ) } + session.Wait() + bp.mergeBlocks(ctx) - // handleChildren - if len(cids) > 0 { // we have child nodes to get - log.Debug( - ctx, - "Handling children for log", - logging.NewKV("NChildren", len(cids)), - logging.NewKV("CID", cid), - ) - var session sync.WaitGroup - s.peer.handleChildBlocks(&session, txn, col, docKey, "", nd, cids, getter) - session.Wait() - // dagWorkers specific to the dockey will have been spawned within handleChildBlocks. - // Once we are done with the dag syncing process, we can get rid of those workers. - s.peer.closeJob <- docKey.DocKey - } else { - log.Debug(ctx, "No more children to process for log", logging.NewKV("CID", cid)) + // dagWorkers specific to the dockey will have been spawned within handleChildBlocks. + // Once we are done with the dag syncing process, we can get rid of those workers. + if s.peer.closeJob != nil { + s.peer.closeJob <- dsKey.DocKey } if txnErr = txn.Commit(ctx); txnErr != nil { @@ -323,7 +315,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL // Once processed, subscribe to the dockey topic on the pubsub network unless we already // suscribe to the collection. if !s.hasPubSubTopic(col.SchemaRoot()) { - err = s.addPubSubTopic(docKey.DocKey, true) + err = s.addPubSubTopic(dsKey.DocKey, true) if err != nil { return nil, err } diff --git a/net/server_test.go b/net/server_test.go index 42ab8b9214..c2f7a2f82f 100644 --- a/net/server_test.go +++ b/net/server_test.go @@ -248,8 +248,10 @@ func TestDocQueue(t *testing.T) { func TestPushLog(t *testing.T) { ctx := context.Background() db, n := newTestNode(ctx, t) + err := n.Start() + require.NoError(t, err) - _, err := db.AddSchema(ctx, `type User { + _, err = db.AddSchema(ctx, `type User { name: String age: Int }`)