Skip to content

Commit 98dd895

Browse files
authoredJan 14, 2022
Use do not send blocks for pause/resume & prevent processing of blocks on cancelled requests (#333)
* feat(executor): run block hooks ahead of advancement * feat(requestmanager): shutdown online lines once cancelled * refactor(executor): switch to sending doNotSendFirstBlocks for pause/resume
1 parent 795beb9 commit 98dd895

File tree

5 files changed

+101
-104
lines changed

5 files changed

+101
-104
lines changed
 

‎requestmanager/client.go

+17-18
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/hannahhoward/go-pubsub"
1111
blocks "github.com/ipfs/go-block-format"
12-
"github.com/ipfs/go-cid"
1312
logging "github.com/ipfs/go-log/v2"
1413
"github.com/ipfs/go-peertaskqueue/peertask"
1514
"github.com/ipld/go-ipld-prime"
@@ -46,23 +45,23 @@ const (
4645
)
4746

4847
type inProgressRequestStatus struct {
49-
ctx context.Context
50-
span trace.Span
51-
startTime time.Time
52-
cancelFn func()
53-
p peer.ID
54-
terminalError error
55-
pauseMessages chan struct{}
56-
state graphsync.RequestState
57-
lastResponse atomic.Value
58-
onTerminated []chan<- error
59-
request gsmsg.GraphSyncRequest
60-
doNotSendCids *cid.Set
61-
nodeStyleChooser traversal.LinkTargetNodePrototypeChooser
62-
inProgressChan chan graphsync.ResponseProgress
63-
inProgressErr chan error
64-
traverser ipldutil.Traverser
65-
traverserCancel context.CancelFunc
48+
ctx context.Context
49+
span trace.Span
50+
startTime time.Time
51+
cancelFn func()
52+
p peer.ID
53+
terminalError error
54+
pauseMessages chan struct{}
55+
state graphsync.RequestState
56+
lastResponse atomic.Value
57+
onTerminated []chan<- error
58+
request gsmsg.GraphSyncRequest
59+
doNotSendFirstBlocks int64
60+
nodeStyleChooser traversal.LinkTargetNodePrototypeChooser
61+
inProgressChan chan graphsync.ResponseProgress
62+
inProgressErr chan error
63+
traverser ipldutil.Traverser
64+
traverserCancel context.CancelFunc
6665
}
6766

6867
// PeerHandler is an interface that can send requests to peers

‎requestmanager/executor/executor.go

+19-18
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,17 @@ import (
55
"context"
66
"sync/atomic"
77

8-
"github.com/ipfs/go-cid"
98
logging "github.com/ipfs/go-log/v2"
109
"github.com/ipfs/go-peertaskqueue/peertask"
1110
"github.com/ipld/go-ipld-prime"
12-
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
1311
"github.com/ipld/go-ipld-prime/traversal"
1412
"github.com/libp2p/go-libp2p-core/peer"
1513
"go.opentelemetry.io/otel"
1614
"go.opentelemetry.io/otel/codes"
1715
"go.opentelemetry.io/otel/trace"
1816

1917
"github.com/ipfs/go-graphsync"
20-
"github.com/ipfs/go-graphsync/cidset"
18+
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
2119
"github.com/ipfs/go-graphsync/ipldutil"
2220
gsmsg "github.com/ipfs/go-graphsync/message"
2321
"github.com/ipfs/go-graphsync/requestmanager/hooks"
@@ -102,17 +100,17 @@ func (e *Executor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask.
102100

103101
// RequestTask are parameters for a single request execution
104102
type RequestTask struct {
105-
Ctx context.Context
106-
Span trace.Span
107-
Request gsmsg.GraphSyncRequest
108-
LastResponse *atomic.Value
109-
DoNotSendCids *cid.Set
110-
PauseMessages <-chan struct{}
111-
Traverser ipldutil.Traverser
112-
P peer.ID
113-
InProgressErr chan error
114-
Empty bool
115-
InitialRequest bool
103+
Ctx context.Context
104+
Span trace.Span
105+
Request gsmsg.GraphSyncRequest
106+
LastResponse *atomic.Value
107+
DoNotSendFirstBlocks int64
108+
PauseMessages <-chan struct{}
109+
Traverser ipldutil.Traverser
110+
P peer.ID
111+
InProgressErr chan error
112+
Empty bool
113+
InitialRequest bool
116114
}
117115

118116
func (e *Executor) traverse(rt RequestTask) error {
@@ -177,7 +175,6 @@ func (e *Executor) processBlockHooks(p peer.ID, response graphsync.ResponseData,
177175
}
178176

179177
func (e *Executor) onNewBlock(rt RequestTask, block graphsync.BlockData) error {
180-
rt.DoNotSendCids.Add(block.Link().(cidlink.Link).Cid)
181178
response := rt.LastResponse.Load().(gsmsg.GraphSyncResponse)
182179
return e.processBlockHooks(rt.P, response, block)
183180
}
@@ -218,12 +215,16 @@ func (e *Executor) processResult(rt RequestTask, link ipld.Link, result types.As
218215

219216
func (e *Executor) startRemoteRequest(rt RequestTask) error {
220217
request := rt.Request
221-
if rt.DoNotSendCids.Len() > 0 {
222-
cidsData, err := cidset.EncodeCidSet(rt.DoNotSendCids)
218+
doNotSendFirstBlocks := rt.DoNotSendFirstBlocks
219+
if doNotSendFirstBlocks < int64(rt.Traverser.NBlocksTraversed()) {
220+
doNotSendFirstBlocks = int64(rt.Traverser.NBlocksTraversed())
221+
}
222+
if doNotSendFirstBlocks > 0 {
223+
doNotSendFirstBlocksData, err := donotsendfirstblocks.EncodeDoNotSendFirstBlocks(doNotSendFirstBlocks)
223224
if err != nil {
224225
return err
225226
}
226-
request = rt.Request.ReplaceExtensions([]graphsync.ExtensionData{{Name: graphsync.ExtensionDoNotSendCIDs, Data: cidsData}})
227+
request = rt.Request.ReplaceExtensions([]graphsync.ExtensionData{{Name: graphsync.ExtensionsDoNotSendFirstBlocks, Data: doNotSendFirstBlocksData}})
227228
}
228229
log.Debugw("starting remote request", "id", rt.Request.ID(), "peer", rt.P.String(), "root_cid", rt.Request.Root().String())
229230
e.manager.SendRequest(rt.P, request)

‎requestmanager/executor/executor_test.go

+30-31
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"testing"
99
"time"
1010

11-
"github.com/ipfs/go-cid"
1211
"github.com/ipfs/go-peertaskqueue/peertask"
1312
"github.com/ipld/go-ipld-prime"
1413
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
@@ -17,7 +16,7 @@ import (
1716
"github.com/stretchr/testify/require"
1817

1918
"github.com/ipfs/go-graphsync"
20-
"github.com/ipfs/go-graphsync/cidset"
19+
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
2120
"github.com/ipfs/go-graphsync/ipldutil"
2221
gsmsg "github.com/ipfs/go-graphsync/message"
2322
"github.com/ipfs/go-graphsync/requestmanager/executor"
@@ -102,21 +101,21 @@ func TestRequestExecutionBlockChain(t *testing.T) {
102101
require.EqualError(t, ree.terminalError, hooks.ErrPaused{}.Error())
103102
},
104103
},
105-
"preexisting do not send cids": {
104+
"preexisting do not send firstBlocks": {
106105
configureRequestExecution: func(p peer.ID, requestID graphsync.RequestID, tbc *testutil.TestBlockChain, ree *requestExecutionEnv) {
107-
ree.doNotSendCids.Add(tbc.GenisisLink.(cidlink.Link).Cid)
106+
ree.doNotSendFirstBlocks = 1
108107
},
109108
verifyResults: func(t *testing.T, tbc *testutil.TestBlockChain, ree *requestExecutionEnv, responses []graphsync.ResponseProgress, receivedErrors []error) {
110109
tbc.VerifyWholeChainSync(responses)
111110
require.Empty(t, receivedErrors)
112111
require.Equal(t, ree.request.ID(), ree.requestsSent[0].request.ID())
113112
require.Equal(t, ree.request.Root(), ree.requestsSent[0].request.Root())
114113
require.Equal(t, ree.request.Selector(), ree.requestsSent[0].request.Selector())
115-
doNotSendCidsExt, has := ree.requestsSent[0].request.Extension(graphsync.ExtensionDoNotSendCIDs)
114+
doNotSendFirstBlocksData, has := ree.requestsSent[0].request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
116115
require.True(t, has)
117-
cidSet, err := cidset.DecodeCidSet(doNotSendCidsExt)
116+
doNotSendFirstBlocks, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
118117
require.NoError(t, err)
119-
require.Equal(t, 1, cidSet.Len())
118+
require.Equal(t, int64(1), doNotSendFirstBlocks)
120119
require.Len(t, ree.blookHooksCalled, 10)
121120
require.NoError(t, ree.terminalError)
122121
},
@@ -145,11 +144,11 @@ func TestRequestExecutionBlockChain(t *testing.T) {
145144
require.Equal(t, ree.request.ID(), ree.requestsSent[0].request.ID())
146145
require.Equal(t, ree.request.Root(), ree.requestsSent[0].request.Root())
147146
require.Equal(t, ree.request.Selector(), ree.requestsSent[0].request.Selector())
148-
doNotSendCidsExt, has := ree.requestsSent[0].request.Extension(graphsync.ExtensionDoNotSendCIDs)
147+
doNotSendFirstBlocksData, has := ree.requestsSent[0].request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
149148
require.True(t, has)
150-
cidSet, err := cidset.DecodeCidSet(doNotSendCidsExt)
149+
doNotSendFirstBlocks, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
151150
require.NoError(t, err)
152-
require.Equal(t, 6, cidSet.Len())
151+
require.Equal(t, int64(6), doNotSendFirstBlocks)
153152
require.Len(t, ree.blookHooksCalled, 10)
154153
require.NoError(t, ree.terminalError)
155154
},
@@ -202,16 +201,16 @@ func TestRequestExecutionBlockChain(t *testing.T) {
202201
defer requestCancel()
203202
var responsesReceived []graphsync.ResponseProgress
204203
ree := &requestExecutionEnv{
205-
ctx: requestCtx,
206-
p: p,
207-
pauseMessages: make(chan struct{}, 1),
208-
blockHookResults: make(map[blockHookKey]hooks.UpdateResult),
209-
doNotSendCids: cid.NewSet(),
210-
request: gsmsg.NewRequest(requestID, tbc.TipLink.(cidlink.Link).Cid, tbc.Selector(), graphsync.Priority(rand.Int31())),
211-
fal: fal,
212-
tbc: tbc,
213-
initialRequest: true,
214-
inProgressErr: make(chan error, 1),
204+
ctx: requestCtx,
205+
p: p,
206+
pauseMessages: make(chan struct{}, 1),
207+
blockHookResults: make(map[blockHookKey]hooks.UpdateResult),
208+
doNotSendFirstBlocks: 0,
209+
request: gsmsg.NewRequest(requestID, tbc.TipLink.(cidlink.Link).Cid, tbc.Selector(), graphsync.Priority(rand.Int31())),
210+
fal: fal,
211+
tbc: tbc,
212+
initialRequest: true,
213+
inProgressErr: make(chan error, 1),
215214
traverser: ipldutil.TraversalBuilder{
216215
Root: tbc.TipLink,
217216
Selector: tbc.Selector(),
@@ -276,7 +275,7 @@ type requestExecutionEnv struct {
276275
request gsmsg.GraphSyncRequest
277276
p peer.ID
278277
blockHookResults map[blockHookKey]hooks.UpdateResult
279-
doNotSendCids *cid.Set
278+
doNotSendFirstBlocks int64
280279
pauseMessages chan struct{}
281280
externalPause pauseKey
282281
loadLocallyUntil int
@@ -304,16 +303,16 @@ func (ree *requestExecutionEnv) GetRequestTask(_ peer.ID, _ *peertask.Task, requ
304303
lastResponse.Store(gsmsg.NewResponse(ree.request.ID(), graphsync.RequestAcknowledged))
305304

306305
requestExecution := executor.RequestTask{
307-
Ctx: ree.ctx,
308-
Request: ree.request,
309-
LastResponse: &lastResponse,
310-
DoNotSendCids: ree.doNotSendCids,
311-
PauseMessages: ree.pauseMessages,
312-
Traverser: ree.traverser,
313-
P: ree.p,
314-
InProgressErr: ree.inProgressErr,
315-
Empty: false,
316-
InitialRequest: ree.initialRequest,
306+
Ctx: ree.ctx,
307+
Request: ree.request,
308+
LastResponse: &lastResponse,
309+
DoNotSendFirstBlocks: ree.doNotSendFirstBlocks,
310+
PauseMessages: ree.pauseMessages,
311+
Traverser: ree.traverser,
312+
P: ree.p,
313+
InProgressErr: ree.inProgressErr,
314+
Empty: false,
315+
InitialRequest: ree.initialRequest,
317316
}
318317
go func() {
319318
select {

‎requestmanager/requestmanager_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ import (
1515
"github.com/stretchr/testify/require"
1616

1717
"github.com/ipfs/go-graphsync"
18-
"github.com/ipfs/go-graphsync/cidset"
1918
"github.com/ipfs/go-graphsync/dedupkey"
19+
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
2020
"github.com/ipfs/go-graphsync/listeners"
2121
gsmsg "github.com/ipfs/go-graphsync/message"
2222
"github.com/ipfs/go-graphsync/messagequeue"
@@ -877,10 +877,10 @@ func TestPauseResume(t *testing.T) {
877877

878878
// verify the correct new request with Do-no-send-cids & other extensions
879879
resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
880-
doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
881-
doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
880+
doNotSendFirstBlocksData, has := resumedRequest.gsr.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
881+
doNotSendFirstBlocks, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
882882
require.NoError(t, err)
883-
require.Equal(t, pauseAt, doNotSendCids.Len())
883+
require.Equal(t, pauseAt, int(doNotSendFirstBlocks))
884884
require.True(t, has)
885885
ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
886886
require.True(t, has)
@@ -957,10 +957,10 @@ func TestPauseResumeExternal(t *testing.T) {
957957

958958
// verify the correct new request with Do-no-send-cids & other extensions
959959
resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
960-
doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
961-
doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
960+
doNotSendFirstBlocksData, has := resumedRequest.gsr.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
961+
doNotSendFirstBlocks, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
962962
require.NoError(t, err)
963-
require.Equal(t, pauseAt, doNotSendCids.Len())
963+
require.Equal(t, pauseAt, int(doNotSendFirstBlocks))
964964
require.True(t, has)
965965
ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
966966
require.True(t, has)

‎requestmanager/server.go

+28-30
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"time"
99

1010
blocks "github.com/ipfs/go-block-format"
11-
"github.com/ipfs/go-cid"
1211
"github.com/ipfs/go-peertaskqueue/peertask"
1312
"github.com/ipfs/go-peertaskqueue/peertracker"
1413
"github.com/ipld/go-ipld-prime"
@@ -22,8 +21,8 @@ import (
2221
"go.opentelemetry.io/otel/trace"
2322

2423
"github.com/ipfs/go-graphsync"
25-
"github.com/ipfs/go-graphsync/cidset"
2624
"github.com/ipfs/go-graphsync/dedupkey"
25+
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
2726
"github.com/ipfs/go-graphsync/ipldutil"
2827
gsmsg "github.com/ipfs/go-graphsync/message"
2928
"github.com/ipfs/go-graphsync/peerstate"
@@ -73,34 +72,32 @@ func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld
7372
rp, err := rm.singleErrorResponse(err)
7473
return request, rp, err
7574
}
76-
doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
77-
var doNotSendCids *cid.Set
75+
doNotSendFirstBlocksData, has := request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
76+
var doNotSendFirstBlocks int64
7877
if has {
79-
doNotSendCids, err = cidset.DecodeCidSet(doNotSendCidsData)
78+
doNotSendFirstBlocks, err = donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
8079
if err != nil {
8180
span.RecordError(err)
8281
span.SetStatus(codes.Error, err.Error())
8382
defer parentSpan.End()
8483
rp, err := rm.singleErrorResponse(err)
8584
return request, rp, err
8685
}
87-
} else {
88-
doNotSendCids = cid.NewSet()
8986
}
9087
ctx, cancel := context.WithCancel(ctx)
9188
requestStatus := &inProgressRequestStatus{
92-
ctx: ctx,
93-
span: parentSpan,
94-
startTime: time.Now(),
95-
cancelFn: cancel,
96-
p: p,
97-
pauseMessages: make(chan struct{}, 1),
98-
doNotSendCids: doNotSendCids,
99-
request: request,
100-
state: graphsync.Queued,
101-
nodeStyleChooser: hooksResult.CustomChooser,
102-
inProgressChan: make(chan graphsync.ResponseProgress),
103-
inProgressErr: make(chan error),
89+
ctx: ctx,
90+
span: parentSpan,
91+
startTime: time.Now(),
92+
cancelFn: cancel,
93+
p: p,
94+
pauseMessages: make(chan struct{}, 1),
95+
doNotSendFirstBlocks: doNotSendFirstBlocks,
96+
request: request,
97+
state: graphsync.Queued,
98+
nodeStyleChooser: hooksResult.CustomChooser,
99+
inProgressChan: make(chan graphsync.ResponseProgress),
100+
inProgressErr: make(chan error),
104101
}
105102
requestStatus.lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged))
106103
rm.inProgressRequestStatuses[request.ID()] = requestStatus
@@ -157,17 +154,17 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
157154

158155
ipr.state = graphsync.Running
159156
return executor.RequestTask{
160-
Ctx: ipr.ctx,
161-
Span: ipr.span,
162-
Request: ipr.request,
163-
LastResponse: &ipr.lastResponse,
164-
DoNotSendCids: ipr.doNotSendCids,
165-
PauseMessages: ipr.pauseMessages,
166-
Traverser: ipr.traverser,
167-
P: ipr.p,
168-
InProgressErr: ipr.inProgressErr,
169-
InitialRequest: initialRequest,
170-
Empty: false,
157+
Ctx: ipr.ctx,
158+
Span: ipr.span,
159+
Request: ipr.request,
160+
LastResponse: &ipr.lastResponse,
161+
DoNotSendFirstBlocks: ipr.doNotSendFirstBlocks,
162+
PauseMessages: ipr.pauseMessages,
163+
Traverser: ipr.traverser,
164+
P: ipr.p,
165+
InProgressErr: ipr.inProgressErr,
166+
InitialRequest: initialRequest,
167+
Empty: false,
171168
}
172169
}
173170

@@ -259,6 +256,7 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr
259256
rm.terminateRequest(requestID, ipr)
260257
} else {
261258
ipr.cancelFn()
259+
rm.asyncLoader.CompleteResponsesFor(requestID)
262260
}
263261
}
264262

0 commit comments

Comments
 (0)