Skip to content

Commit 59de059

Browse files
authored
fix: split bitswap messages (#507)
Split large bitswap messages into multiple smaller messages to keep them under the max message size other implementations will accept.
1 parent 338885f commit 59de059

12 files changed

+545
-201
lines changed

packages/bitswap/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@
156156
"@libp2p/utils": "^5.2.3",
157157
"@multiformats/multiaddr": "^12.1.14",
158158
"any-signal": "^4.1.1",
159-
"debug": "^4.3.4",
160159
"interface-blockstore": "^5.2.9",
161160
"interface-store": "^5.1.7",
162161
"it-all": "^3.0.4",
@@ -177,6 +176,7 @@
177176
"uint8arrays": "^5.0.1"
178177
},
179178
"devDependencies": {
179+
"@libp2p/crypto": "^4.0.6",
180180
"@libp2p/interface-compliance-tests": "^5.1.3",
181181
"@libp2p/peer-id-factory": "^4.0.5",
182182
"@types/sinon": "^17.0.3",

packages/bitswap/src/constants.ts

+2
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ export const DEFAULT_MESSAGE_SEND_CONCURRENCY = 50
99
export const DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS = false
1010
export const DEFAULT_SESSION_ROOT_PRIORITY = 1
1111
export const DEFAULT_MAX_PROVIDERS_PER_REQUEST = 3
12+
export const DEFAULT_MAX_OUTGOING_MESSAGE_SIZE = 1024 * 1024 * 2
13+
export const DEFAULT_MAX_INCOMING_MESSAGE_SIZE = DEFAULT_MAX_OUTGOING_MESSAGE_SIZE

packages/bitswap/src/index.ts

+22-8
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,6 @@ export interface BitswapOptions {
117117
*/
118118
protocol?: string
119119

120-
/**
121-
* When a new peer connects, sending our WantList should complete within this
122-
* many ms
123-
*
124-
* @default 5000
125-
*/
126-
messageSendTimeout?: number
127-
128120
/**
129121
* When sending want list updates to peers, how many messages to send at once
130122
*
@@ -167,6 +159,28 @@ export interface BitswapOptions {
167159
* @default 1024
168160
*/
169161
maxSizeReplaceHasWithBlock?: number
162+
163+
/**
164+
* The maximum size in bytes of a message that we will send. If a message is
165+
* larger than this (due to lots of blocks or wantlist entries) it will be
166+
* broken up into several smaller messages that are under this size.
167+
*
168+
* @see https://github.com/ipfs/boxo/blob/eeea414587350401b6b804f0574ed8436833331d/bitswap/client/internal/messagequeue/messagequeue.go#L33
169+
*
170+
* @default 2097152
171+
*/
172+
maxOutgoingMessageSize?: number
173+
174+
/**
175+
* The maximum size in bytes of an incoming message that we will process.
176+
*
177+
* Messages larger than this will cause the incoming stream to be reset.
178+
*
179+
* Defaults to `maxOutgoingMessageSize`
180+
*
181+
* @default 2097152
182+
*/
183+
maxIncomingMessageSize?: number
170184
}
171185

172186
export const createBitswap = (components: BitswapComponents, options: BitswapOptions = {}): Bitswap => {

packages/bitswap/src/network.ts

+60-150
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,25 @@
11
import { CodeError, TypedEventEmitter, setMaxListeners } from '@libp2p/interface'
22
import { PeerQueue, type PeerQueueJobOptions } from '@libp2p/utils/peer-queue'
3-
import { anySignal } from 'any-signal'
4-
import debug from 'debug'
53
import drain from 'it-drain'
64
import * as lp from 'it-length-prefixed'
7-
import { lpStream } from 'it-length-prefixed-stream'
85
import map from 'it-map'
96
import { pipe } from 'it-pipe'
107
import take from 'it-take'
11-
import { base64 } from 'multiformats/bases/base64'
12-
import { CID } from 'multiformats/cid'
138
import { CustomProgressEvent } from 'progress-events'
149
import { raceEvent } from 'race-event'
15-
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
16-
import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_CONCURRENCY, DEFAULT_MESSAGE_SEND_TIMEOUT, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.js'
10+
import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_INCOMING_MESSAGE_SIZE, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_OUTGOING_MESSAGE_SIZE, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_CONCURRENCY, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.js'
1711
import { BitswapMessage } from './pb/message.js'
12+
import { mergeMessages } from './utils/merge-messages.js'
13+
import { splitMessage } from './utils/split-message.js'
1814
import type { WantOptions } from './bitswap.js'
1915
import type { MultihashHasherLoader } from './index.js'
20-
import type { Block, BlockPresence, WantlistEntry } from './pb/message.js'
16+
import type { Block } from './pb/message.js'
2117
import type { Provider, Routing } from '@helia/interface/routing'
2218
import type { Libp2p, AbortOptions, Connection, PeerId, IncomingStreamData, Topology, ComponentLogger, IdentifyResult, Counter } from '@libp2p/interface'
2319
import type { Logger } from '@libp2p/logger'
20+
import type { CID } from 'multiformats/cid'
2421
import type { ProgressEvent, ProgressOptions } from 'progress-events'
2522

26-
// Add a formatter for a bitswap message
27-
debug.formatters.B = (b?: BitswapMessage): string => {
28-
if (b == null) {
29-
return 'undefined'
30-
}
31-
32-
return JSON.stringify({
33-
blocks: b.blocks?.map(b => ({
34-
data: `${uint8ArrayToString(b.data, 'base64').substring(0, 10)}...`,
35-
prefix: uint8ArrayToString(b.prefix, 'base64')
36-
})),
37-
blockPresences: b.blockPresences?.map(p => ({
38-
...p,
39-
cid: CID.decode(p.cid).toString()
40-
})),
41-
wantlist: b.wantlist == null
42-
? undefined
43-
: {
44-
full: b.wantlist.full,
45-
entries: b.wantlist.entries.map(e => ({
46-
...e,
47-
cid: CID.decode(e.cid).toString()
48-
}))
49-
}
50-
}, null, 2)
51-
}
52-
5323
export type BitswapNetworkProgressEvents =
5424
ProgressEvent<'bitswap:network:dial', PeerId>
5525

@@ -68,10 +38,11 @@ export interface NetworkInit {
6838
maxInboundStreams?: number
6939
maxOutboundStreams?: number
7040
messageReceiveTimeout?: number
71-
messageSendTimeout?: number
7241
messageSendConcurrency?: number
7342
protocols?: string[]
7443
runOnTransientConnections?: boolean
44+
maxOutgoingMessageSize?: number
45+
maxIncomingMessageSize?: number
7546
}
7647

7748
export interface NetworkComponents {
@@ -107,8 +78,9 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
10778
private registrarIds: string[]
10879
private readonly metrics: { blocksSent?: Counter, dataSent?: Counter }
10980
private readonly sendQueue: PeerQueue<void, SendMessageJobOptions>
110-
private readonly messageSendTimeout: number
11181
private readonly runOnTransientConnections: boolean
82+
private readonly maxOutgoingMessageSize: number
83+
private readonly maxIncomingMessageSize: number
11284

11385
constructor (components: NetworkComponents, init: NetworkInit = {}) {
11486
super()
@@ -125,8 +97,9 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
12597
this.maxInboundStreams = init.maxInboundStreams ?? DEFAULT_MAX_INBOUND_STREAMS
12698
this.maxOutboundStreams = init.maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS
12799
this.messageReceiveTimeout = init.messageReceiveTimeout ?? DEFAULT_MESSAGE_RECEIVE_TIMEOUT
128-
this.messageSendTimeout = init.messageSendTimeout ?? DEFAULT_MESSAGE_SEND_TIMEOUT
129100
this.runOnTransientConnections = init.runOnTransientConnections ?? DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS
101+
this.maxIncomingMessageSize = init.maxIncomingMessageSize ?? DEFAULT_MAX_OUTGOING_MESSAGE_SIZE
102+
this.maxOutgoingMessageSize = init.maxOutgoingMessageSize ?? init.maxIncomingMessageSize ?? DEFAULT_MAX_INCOMING_MESSAGE_SIZE
130103
this.metrics = {
131104
blocksSent: components.libp2p.metrics?.registerCounter('helia_bitswap_sent_blocks_total'),
132105
dataSent: components.libp2p.metrics?.registerCounter('helia_bitswap_sent_data_bytes_total')
@@ -212,21 +185,29 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
212185
Promise.resolve().then(async () => {
213186
this.log('incoming new bitswap %s stream from %p', stream.protocol, connection.remotePeer)
214187
const abortListener = (): void => {
215-
stream.abort(new CodeError('Incoming Bitswap stream timed out', 'ERR_TIMEOUT'))
188+
if (stream.status === 'open') {
189+
stream.abort(new CodeError('Incoming Bitswap stream timed out', 'ERR_TIMEOUT'))
190+
} else {
191+
this.log('stream aborted with status', stream.status)
192+
}
216193
}
217194

218195
let signal = AbortSignal.timeout(this.messageReceiveTimeout)
219196
setMaxListeners(Infinity, signal)
220197
signal.addEventListener('abort', abortListener)
221198

199+
await stream.closeWrite()
200+
222201
await pipe(
223202
stream,
224-
(source) => lp.decode(source),
203+
(source) => lp.decode(source, {
204+
maxDataLength: this.maxIncomingMessageSize
205+
}),
225206
async (source) => {
226207
for await (const data of source) {
227208
try {
228209
const message = BitswapMessage.decode(data)
229-
this.log('incoming new bitswap %s message from %p %B', stream.protocol, connection.remotePeer, message)
210+
this.log('incoming new bitswap %s message from %p on stream', stream.protocol, connection.remotePeer, stream.id)
230211

231212
this.safeDispatchEvent('bitswap:message', {
232213
detail: {
@@ -241,7 +222,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
241222
setMaxListeners(Infinity, signal)
242223
signal.addEventListener('abort', abortListener)
243224
} catch (err: any) {
244-
this.log.error('error reading incoming bitswap message from %p', connection.remotePeer, err)
225+
this.log.error('error reading incoming bitswap message from %p on stream', connection.remotePeer, stream.id, err)
245226
stream.abort(err)
246227
break
247228
}
@@ -309,58 +290,55 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
309290
pendingBytes: msg.pendingBytes ?? 0
310291
}
311292

312-
const timeoutSignal = AbortSignal.timeout(this.messageSendTimeout)
313-
const signal = anySignal([timeoutSignal, options?.signal])
314-
setMaxListeners(Infinity, timeoutSignal, signal)
293+
const existingJob = this.sendQueue.queue.find(job => {
294+
return peerId.equals(job.options.peerId) && job.status === 'queued'
295+
})
315296

316-
try {
317-
const existingJob = this.sendQueue.queue.find(job => {
318-
return peerId.equals(job.options.peerId) && job.status === 'queued'
297+
if (existingJob != null) {
298+
// merge messages instead of adding new job
299+
existingJob.options.message = mergeMessages(existingJob.options.message, message)
300+
301+
await existingJob.join({
302+
signal: options?.signal
319303
})
320304

321-
if (existingJob != null) {
322-
// merge messages instead of adding new job
323-
existingJob.options.message = mergeMessages(existingJob.options.message, message)
305+
return
306+
}
324307

325-
await existingJob.join({
326-
signal
327-
})
308+
await this.sendQueue.add(async (options) => {
309+
const message = options?.message
328310

329-
return
311+
if (message == null) {
312+
throw new CodeError('No message to send', 'ERR_NO_MESSAGE')
330313
}
331314

332-
await this.sendQueue.add(async (options) => {
333-
const message = options?.message
315+
this.log('sendMessage to %p', peerId)
334316

335-
if (message == null) {
336-
throw new CodeError('No message to send', 'ERR_NO_MESSAGE')
337-
}
317+
options?.onProgress?.(new CustomProgressEvent<PeerId>('bitswap:network:send-wantlist', peerId))
338318

339-
this.log('sendMessage to %p %B', peerId, message)
319+
const stream = await this.libp2p.dialProtocol(peerId, BITSWAP_120, options)
320+
await stream.closeRead()
340321

341-
options?.onProgress?.(new CustomProgressEvent<PeerId>('bitswap:network:send-wantlist', peerId))
322+
try {
323+
await pipe(
324+
splitMessage(message, this.maxOutgoingMessageSize),
325+
(source) => lp.encode(source),
326+
stream
327+
)
342328

343-
const stream = await this.libp2p.dialProtocol(peerId, BITSWAP_120, options)
344-
345-
try {
346-
const lp = lpStream(stream)
347-
await lp.write(BitswapMessage.encode(message), options)
348-
await lp.unwrap().close(options)
349-
} catch (err: any) {
350-
options?.onProgress?.(new CustomProgressEvent<{ peer: PeerId, error: Error }>('bitswap:network:send-wantlist:error', { peer: peerId, error: err }))
351-
this.log.error('error sending message to %p', peerId, err)
352-
stream.abort(err)
353-
}
329+
await stream.close(options)
330+
} catch (err: any) {
331+
options?.onProgress?.(new CustomProgressEvent<{ peer: PeerId, error: Error }>('bitswap:network:send-wantlist:error', { peer: peerId, error: err }))
332+
this.log.error('error sending message to %p', peerId, err)
333+
stream.abort(err)
334+
}
354335

355-
this._updateSentStats(message.blocks)
356-
}, {
357-
peerId,
358-
signal,
359-
message
360-
})
361-
} finally {
362-
signal.clear()
363-
}
336+
this._updateSentStats(message.blocks)
337+
}, {
338+
peerId,
339+
signal: options?.signal,
340+
message
341+
})
364342
}
365343

366344
/**
@@ -409,71 +387,3 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
409387
this.metrics.blocksSent?.increment(blocks.length)
410388
}
411389
}
412-
413-
function mergeMessages (messageA: BitswapMessage, messageB: BitswapMessage): BitswapMessage {
414-
const wantListEntries = new Map<string, WantlistEntry>(
415-
(messageA.wantlist?.entries ?? []).map(entry => ([
416-
base64.encode(entry.cid),
417-
entry
418-
]))
419-
)
420-
421-
for (const entry of messageB.wantlist?.entries ?? []) {
422-
const key = base64.encode(entry.cid)
423-
const existingEntry = wantListEntries.get(key)
424-
425-
if (existingEntry != null) {
426-
// take highest priority
427-
if (existingEntry.priority > entry.priority) {
428-
entry.priority = existingEntry.priority
429-
}
430-
431-
// take later values if passed, otherwise use earlier ones
432-
entry.cancel = entry.cancel ?? existingEntry.cancel
433-
entry.wantType = entry.wantType ?? existingEntry.wantType
434-
entry.sendDontHave = entry.sendDontHave ?? existingEntry.sendDontHave
435-
}
436-
437-
wantListEntries.set(key, entry)
438-
}
439-
440-
const blockPresences = new Map<string, BlockPresence>(
441-
messageA.blockPresences.map(presence => ([
442-
base64.encode(presence.cid),
443-
presence
444-
]))
445-
)
446-
447-
for (const blockPresence of messageB.blockPresences) {
448-
const key = base64.encode(blockPresence.cid)
449-
450-
// override earlier block presence with later one as if duplicated it is
451-
// likely to be more accurate since it is more recent
452-
blockPresences.set(key, blockPresence)
453-
}
454-
455-
const blocks = new Map<string, Block>(
456-
messageA.blocks.map(block => ([
457-
base64.encode(block.data),
458-
block
459-
]))
460-
)
461-
462-
for (const block of messageB.blocks) {
463-
const key = base64.encode(block.data)
464-
465-
blocks.set(key, block)
466-
}
467-
468-
const output: BitswapMessage = {
469-
wantlist: {
470-
full: messageA.wantlist?.full ?? messageB.wantlist?.full ?? false,
471-
entries: [...wantListEntries.values()]
472-
},
473-
blockPresences: [...blockPresences.values()],
474-
blocks: [...blocks.values()],
475-
pendingBytes: messageA.pendingBytes + messageB.pendingBytes
476-
}
477-
478-
return output
479-
}

0 commit comments

Comments
 (0)