Skip to content

Commit b4f8216

Browse files
feat: pre-emptive stream creation for protocols (#1516)
* pass log as an arg to baseprotocol * optimistically create and use streams for light protocols * refactor BaseProtocol for readability * use optimistic stream selection in protocols * use a new stream for every request instead of reusing * replenish streams correctly * create StreamManager * refactor for a single stream * fix: listener binds * declare streamManager as a class var isntead of extending * remove stream destruction as it happens by default * simplify logic & address comments * fix: bind typo * refactor for improvements * fix typedoc * rm: lock * restructure StreamManager for readbility * remove log as an arg * use newStream as a facade in BaseProtoocl
1 parent 1c09092 commit b4f8216

File tree

7 files changed

+89
-20
lines changed

7 files changed

+89
-20
lines changed

packages/core/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,4 @@ export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";
3030
export { ConnectionManager } from "./lib/connection_manager.js";
3131

3232
export { KeepAliveManager } from "./lib/keep_alive_manager.js";
33+
export { StreamManager } from "./lib/stream_manager.js";

packages/core/src/lib/base_protocol.ts

+15-16
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@ import type { Stream } from "@libp2p/interface/connection";
33
import type { PeerId } from "@libp2p/interface/peer-id";
44
import { Peer, PeerStore } from "@libp2p/interface/peer-store";
55
import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces";
6-
import {
7-
getPeersForProtocol,
8-
selectConnection,
9-
selectPeerForProtocol
10-
} from "@waku/utils/libp2p";
6+
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";
7+
8+
import { StreamManager } from "./stream_manager.js";
119

1210
/**
1311
* A class with predefined helpers, to be used as a base to implement Waku
@@ -16,6 +14,7 @@ import {
1614
export class BaseProtocol implements IBaseProtocol {
1715
public readonly addLibp2pEventListener: Libp2p["addEventListener"];
1816
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
17+
protected streamManager: StreamManager;
1918

2019
constructor(
2120
public multicodec: string,
@@ -27,6 +26,17 @@ export class BaseProtocol implements IBaseProtocol {
2726
this.removeLibp2pEventListener = components.events.removeEventListener.bind(
2827
components.events
2928
);
29+
30+
this.streamManager = new StreamManager(
31+
multicodec,
32+
components.connectionManager.getConnections.bind(
33+
components.connectionManager
34+
),
35+
this.addLibp2pEventListener
36+
);
37+
}
38+
protected async getStream(peer: Peer): Promise<Stream> {
39+
return this.streamManager.getStream(peer);
3040
}
3141

3242
public get peerStore(): PeerStore {
@@ -50,15 +60,4 @@ export class BaseProtocol implements IBaseProtocol {
5060
);
5161
return peer;
5262
}
53-
protected async newStream(peer: Peer): Promise<Stream> {
54-
const connections = this.components.connectionManager.getConnections(
55-
peer.id
56-
);
57-
const connection = selectConnection(connections);
58-
if (!connection) {
59-
throw new Error("Failed to get a connection to the peer");
60-
}
61-
62-
return connection.newStream(this.multicodec);
63-
}
6463
}

packages/core/src/lib/filter/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ class Filter extends BaseProtocol implements IReceiver {
271271
this.setActiveSubscription(
272272
_pubSubTopic,
273273
peer.id.toString(),
274-
new Subscription(_pubSubTopic, peer, this.newStream.bind(this, peer))
274+
new Subscription(_pubSubTopic, peer, this.getStream.bind(this, peer))
275275
);
276276

277277
return subscription;

packages/core/src/lib/light_push/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class LightPush extends BaseProtocol implements ILightPush {
103103

104104
let error: undefined | SendError = undefined;
105105
const peer = await this.getPeer(opts?.peerId);
106-
const stream = await this.newStream(peer);
106+
const stream = await this.getStream(peer);
107107

108108
try {
109109
const res = await pipe(

packages/core/src/lib/store/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ class Store extends BaseProtocol implements IStore {
254254
const peer = await this.getPeer(options?.peerId);
255255

256256
for await (const messages of paginate<T>(
257-
this.newStream.bind(this, peer),
257+
this.getStream.bind(this, peer),
258258
queryOpts,
259259
decodersAsMap,
260260
options?.cursor
+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import type { PeerUpdate } from "@libp2p/interface";
2+
import type { Stream } from "@libp2p/interface/connection";
3+
import { Peer } from "@libp2p/interface/peer-store";
4+
import { Libp2p } from "@waku/interfaces";
5+
import { selectConnection } from "@waku/utils/libp2p";
6+
import debug from "debug";
7+
8+
export class StreamManager {
9+
private streamPool: Map<string, Promise<Stream>>;
10+
private log: debug.Debugger;
11+
12+
constructor(
13+
public multicodec: string,
14+
public getConnections: Libp2p["getConnections"],
15+
public addEventListener: Libp2p["addEventListener"]
16+
) {
17+
this.log = debug(`waku:stream-manager:${multicodec}`);
18+
this.addEventListener(
19+
"peer:update",
20+
this.handlePeerUpdateStreamPool.bind(this)
21+
);
22+
this.getStream = this.getStream.bind(this);
23+
this.streamPool = new Map();
24+
}
25+
26+
public async getStream(peer: Peer): Promise<Stream> {
27+
const peerIdStr = peer.id.toString();
28+
const streamPromise = this.streamPool.get(peerIdStr);
29+
30+
if (!streamPromise) {
31+
return this.newStream(peer); // fallback by creating a new stream on the spot
32+
}
33+
34+
// We have the stream, let's remove it from the map
35+
this.streamPool.delete(peerIdStr);
36+
37+
this.prepareNewStream(peer);
38+
39+
const stream = await streamPromise;
40+
41+
if (stream.status === "closed") {
42+
return this.newStream(peer); // fallback by creating a new stream on the spot
43+
}
44+
45+
return stream;
46+
}
47+
48+
private async newStream(peer: Peer): Promise<Stream> {
49+
const connections = this.getConnections(peer.id);
50+
const connection = selectConnection(connections);
51+
if (!connection) {
52+
throw new Error("Failed to get a connection to the peer");
53+
}
54+
return connection.newStream(this.multicodec);
55+
}
56+
57+
private prepareNewStream(peer: Peer): void {
58+
const streamPromise = this.newStream(peer);
59+
this.streamPool.set(peer.id.toString(), streamPromise);
60+
}
61+
62+
private handlePeerUpdateStreamPool = (evt: CustomEvent<PeerUpdate>): void => {
63+
const peer = evt.detail.peer;
64+
if (peer.protocols.includes(this.multicodec)) {
65+
this.log(`Optimistically opening a stream to ${peer.id.toString()}`);
66+
this.prepareNewStream(peer);
67+
}
68+
};
69+
}

packages/peer-exchange/src/waku_peer_exchange.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
4747

4848
const peer = await this.getPeer(params.peerId);
4949

50-
const stream = await this.newStream(peer);
50+
const stream = await this.getStream(peer);
5151

5252
const res = await pipe(
5353
[rpcQuery.encode()],

0 commit comments

Comments
 (0)