-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathclient.go
275 lines (243 loc) · 9.4 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
package responsemanager
import (
"context"
"errors"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue/peertask"
ipld "github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel/trace"
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/peerstate"
"github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/queryexecutor"
"github.com/ipfs/go-graphsync/responsemanager/responseassembler"
"github.com/ipfs/go-graphsync/taskqueue"
)
// The code in this file implements the public interface of the response manager.
// Functions in this file operate outside the internal thread and should
// NOT modify the internal state of the ResponseManager.
var log = logging.Logger("graphsync")
type inProgressResponseStatus struct {
ctx context.Context
span trace.Span
cancelFn func()
request gsmsg.GraphSyncRequest
loader ipld.BlockReadOpener
traverser ipldutil.Traverser
signals queryexecutor.ResponseSignals
updates []gsmsg.GraphSyncRequest
state graphsync.RequestState
subscriber *notifications.TopicDataSubscriber
startTime time.Time
}
type responseKey struct {
p peer.ID
requestID graphsync.RequestID
}
// RequestHooks is an interface for processing request hooks
type RequestHooks interface {
ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult
}
// RequestQueuedHooks is an interface for processing request queued hooks
type RequestQueuedHooks interface {
ProcessRequestQueuedHooks(p peer.ID, request graphsync.RequestData)
}
// UpdateHooks is an interface for processing update hooks
type UpdateHooks interface {
ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult
}
// CompletedListeners is an interface for notifying listeners that responses are complete
type CompletedListeners interface {
NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode)
}
// CancelledListeners is an interface for notifying listeners that requestor cancelled
type CancelledListeners interface {
NotifyCancelledListeners(p peer.ID, request graphsync.RequestData)
}
// BlockSentListeners is an interface for notifying listeners that of a block send occuring over the wire
type BlockSentListeners interface {
NotifyBlockSentListeners(p peer.ID, request graphsync.RequestData, block graphsync.BlockData)
}
// NetworkErrorListeners is an interface for notifying listeners that an error occurred sending a data on the wire
type NetworkErrorListeners interface {
NotifyNetworkErrorListeners(p peer.ID, request graphsync.RequestData, err error)
}
// ResponseAssembler is an interface that returns sender interfaces for peer responses.
type ResponseAssembler interface {
DedupKey(p peer.ID, requestID graphsync.RequestID, key string)
IgnoreBlocks(p peer.ID, requestID graphsync.RequestID, links []ipld.Link)
SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipCount int64)
Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error
}
type responseManagerMessage interface {
handle(rm *ResponseManager)
}
// ResponseManager handles incoming requests from the network, initiates selector
// traversals, and transmits responses
type ResponseManager struct {
ctx context.Context
cancelFn context.CancelFunc
responseAssembler ResponseAssembler
requestHooks RequestHooks
linkSystem ipld.LinkSystem
requestQueuedHooks RequestQueuedHooks
updateHooks UpdateHooks
cancelledListeners CancelledListeners
completedListeners CompletedListeners
blockSentListeners BlockSentListeners
networkErrorListeners NetworkErrorListeners
messages chan responseManagerMessage
inProgressResponses map[responseKey]*inProgressResponseStatus
maxInProcessRequests uint64
connManager network.ConnManager
// maximum number of links to traverse per request. A value of zero = infinity, or no limit
maxLinksPerRequest uint64
responseQueue taskqueue.TaskQueue
}
// New creates a new response manager for responding to requests
func New(ctx context.Context,
linkSystem ipld.LinkSystem,
responseAssembler ResponseAssembler,
requestQueuedHooks RequestQueuedHooks,
requestHooks RequestHooks,
updateHooks UpdateHooks,
completedListeners CompletedListeners,
cancelledListeners CancelledListeners,
blockSentListeners BlockSentListeners,
networkErrorListeners NetworkErrorListeners,
maxInProcessRequests uint64,
connManager network.ConnManager,
maxLinksPerRequest uint64,
responseQueue taskqueue.TaskQueue,
) *ResponseManager {
ctx, cancelFn := context.WithCancel(ctx)
messages := make(chan responseManagerMessage, 16)
rm := &ResponseManager{
ctx: ctx,
cancelFn: cancelFn,
requestHooks: requestHooks,
linkSystem: linkSystem,
responseAssembler: responseAssembler,
requestQueuedHooks: requestQueuedHooks,
updateHooks: updateHooks,
cancelledListeners: cancelledListeners,
completedListeners: completedListeners,
blockSentListeners: blockSentListeners,
networkErrorListeners: networkErrorListeners,
messages: messages,
inProgressResponses: make(map[responseKey]*inProgressResponseStatus),
maxInProcessRequests: maxInProcessRequests,
connManager: connManager,
maxLinksPerRequest: maxLinksPerRequest,
responseQueue: responseQueue,
}
return rm
}
// ProcessRequests processes incoming requests for the given peer
func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, requests []gsmsg.GraphSyncRequest) {
rm.send(&processRequestMessage{p, requests}, ctx.Done())
}
// UnpauseResponse unpauses a response that was previously paused
func (rm *ResponseManager) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
response := make(chan error, 1)
rm.send(&unpauseRequestMessage{p, requestID, response, extensions}, nil)
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case err := <-response:
return err
}
}
// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (rm *ResponseManager) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
response := make(chan error, 1)
rm.send(&pauseRequestMessage{p, requestID, response}, nil)
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case err := <-response:
return err
}
}
// CancelResponse cancels an in progress response
func (rm *ResponseManager) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
response := make(chan error, 1)
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrCancelledByCommand, response}, nil)
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case err := <-response:
return err
}
}
// Synchronize is a utility method that blocks until all current messages are processed
func (rm *ResponseManager) synchronize() {
sync := make(chan error)
rm.send(&synchronizeMessage{sync}, nil)
select {
case <-rm.ctx.Done():
case <-sync:
}
}
// StartTask starts the given task from the peer task queue
func (rm *ResponseManager) StartTask(task *peertask.Task, responseTaskChan chan<- queryexecutor.ResponseTask) {
rm.send(&startTaskRequest{task, responseTaskChan}, nil)
}
// GetUpdates is called to read pending updates for a task and clear them
func (rm *ResponseManager) GetUpdates(p peer.ID, requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
rm.send(&responseUpdateRequest{responseKey{p, requestID}, updatesChan}, nil)
}
// FinishTask marks a task from the task queue as done
func (rm *ResponseManager) FinishTask(task *peertask.Task, err error) {
done := make(chan struct{}, 1)
rm.send(&finishTaskRequest{task, err, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
}
}
// CloseWithNetworkError closes a request due to a network error
func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync.RequestID) {
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, make(chan error, 1)}, nil)
}
// TerminateRequest indicates a request has finished sending data and should no longer be tracked
func (rm *ResponseManager) TerminateRequest(p peer.ID, requestID graphsync.RequestID) {
done := make(chan struct{}, 1)
rm.send(&terminateRequestMessage{p, requestID, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
}
}
// PeerState gets current state of the outgoing responses for a given peer
func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
response := make(chan peerstate.PeerState)
rm.send(&peerStateMessage{p, response}, nil)
select {
case <-rm.ctx.Done():
return peerstate.PeerState{}
case peerState := <-response:
return peerState
}
}
func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) {
select {
case <-rm.ctx.Done():
case <-done:
case rm.messages <- message:
}
}
// Startup starts processing for the WantManager.
func (rm *ResponseManager) Startup() {
go rm.run()
}
// Shutdown ends processing for the want manager.
func (rm *ResponseManager) Shutdown() {
rm.cancelFn()
}