Skip to content

Commit

Permalink
Migrate to it-length-prefixed 8.0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Aug 3, 2022
1 parent f6ab12d commit d0ad868
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 17 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,12 @@
"abortable-iterator": "^4.0.2",
"denque": "^1.5.0",
"err-code": "^3.0.1",
"it-length-prefixed": "^7.0.1",
"it-length-prefixed": "^8.0.2",
"it-pipe": "^2.0.3",
"it-pushable": "^3.0.0",
"multiformats": "^9.6.4",
"protobufjs": "^6.11.2",
"uint8arraylist": "^2.0.0",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
Expand Down
18 changes: 3 additions & 15 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,7 @@ import type { IncomingStreamData } from '@libp2p/interface-registrar'
import { removeFirstNItemsFromSet, removeItemsFromSet } from './utils/set.js'
import { pushable } from 'it-pushable'
import { InboundStream, OutboundStream } from './stream.js'

// From 'bl' library
interface BufferList {
slice: () => Buffer
}
import { Uint8ArrayList } from 'uint8arraylist'

type ConnectionDirection = 'inbound' | 'outbound'

Expand Down Expand Up @@ -866,21 +862,13 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* Responsible for processing each RPC message received by other peers.
*/
private async pipePeerReadStream(peerId: PeerId, stream: AsyncIterable<Uint8Array | BufferList>): Promise<void> {
private async pipePeerReadStream(peerId: PeerId, stream: AsyncIterable<Uint8ArrayList>): Promise<void> {
try {
await pipe(stream, async (source) => {
for await (const data of source) {
try {
// TODO: Check max gossip message size, before decodeRpc()

// Note: `stream` maybe a BufferList which requires calling .slice to concat all the chunks into
// a single Buffer instance that protobuf js can deal with.
// Otherwise it will throw:
// ```
// Error: illegal buffer
// at create_typed_array (js-libp2p-gossipsub/node_modules/protobufjs/src/reader.js:47:15)
const rpcBytes = data instanceof Uint8Array ? data : data.slice()

const rpcBytes = data.subarray()
// Note: This function may throw, it must be wrapped in a try {} catch {} to prevent closing the stream.
// TODO: What should we do if the entire RPC is invalid?
const rpc = RPC.decode(rpcBytes)
Expand Down
3 changes: 2 additions & 1 deletion src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { abortableSource } from 'abortable-iterator'
import { pipe } from 'it-pipe'
import { pushable, Pushable } from 'it-pushable'
import { encode, decode } from 'it-length-prefixed'
import { Uint8ArrayList } from 'uint8arraylist'

export class OutboundStream {
private readonly rawStream: Stream
Expand Down Expand Up @@ -37,7 +38,7 @@ export class OutboundStream {
}

export class InboundStream {
public readonly source: AsyncIterable<Uint8Array>
public readonly source: AsyncIterable<Uint8ArrayList>

private readonly rawStream: Stream
private readonly closeController: AbortController
Expand Down

0 comments on commit d0ad868

Please sign in to comment.