Skip to content

Commit f32d7d9

Browse files
feat: allow passing of multiple ENR URLs to DNS Discovery & dial multiple peers in parallel (#1379)
* allow passing of multiple ENRs to DNS Discovery * add test for >1 ENR to DNS Disc * address comments * feat: dial multiple peers in parallel (#1380) * ensure discovered peers are dialed in parallel * cap parallel dials * drop connection to bootstrap peer if >set connected * switch to american english * improve promises and error logging
1 parent fefc7ae commit f32d7d9

File tree

5 files changed

+139
-18
lines changed

5 files changed

+139
-18
lines changed

packages/core/src/lib/connection_manager.ts

+96-8
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const log = debug("waku:connection-manager");
1212

1313
export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
1414
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
15+
export const DEFAULT_MAX_PARALLEL_DIALS = 3;
1516

1617
export class ConnectionManager {
1718
private static instances = new Map<string, ConnectionManager>();
@@ -21,6 +22,9 @@ export class ConnectionManager {
2122
private dialAttemptsForPeer: Map<string, number> = new Map();
2223
private dialErrorsForPeer: Map<string, any> = new Map();
2324

25+
private currentActiveDialCount = 0;
26+
private pendingPeerDialQueue: Array<PeerId> = [];
27+
2428
public static create(
2529
peerId: string,
2630
libp2p: Libp2p,
@@ -52,6 +56,7 @@ export class ConnectionManager {
5256
this.options = {
5357
maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER,
5458
maxBootstrapPeersAllowed: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED,
59+
maxParallelDials: DEFAULT_MAX_PARALLEL_DIALS,
5560
...options,
5661
};
5762

@@ -60,6 +65,31 @@ export class ConnectionManager {
6065
this.run()
6166
.then(() => log(`Connection Manager is now running`))
6267
.catch((error) => log(`Unexpected error while running service`, error));
68+
69+
// libp2p emits `peer:discovery` events during its initialization
70+
// which means that before the ConnectionManager is initialized, some peers may have been discovered
71+
// we will dial the peers in peerStore ONCE before we start to listen to the `peer:discovery` events within the ConnectionManager
72+
this.dialPeerStorePeers();
73+
}
74+
75+
private async dialPeerStorePeers(): Promise<void> {
76+
const peerInfos = await this.libp2pComponents.peerStore.all();
77+
const dialPromises = [];
78+
for (const peerInfo of peerInfos) {
79+
if (
80+
this.libp2pComponents
81+
.getConnections()
82+
.find((c) => c.remotePeer === peerInfo.id)
83+
)
84+
continue;
85+
86+
dialPromises.push(this.attemptDial(peerInfo.id));
87+
}
88+
try {
89+
await Promise.all(dialPromises);
90+
} catch (error) {
91+
log(`Unexpected error while dialing peer store peers`, error);
92+
}
6393
}
6494

6595
private async run(): Promise<void> {
@@ -86,6 +116,7 @@ export class ConnectionManager {
86116
}
87117

88118
private async dialPeer(peerId: PeerId): Promise<void> {
119+
this.currentActiveDialCount += 1;
89120
let dialAttempt = 0;
90121
while (dialAttempt <= this.options.maxDialAttemptsForPeer) {
91122
try {
@@ -105,6 +136,7 @@ export class ConnectionManager {
105136
return;
106137
} catch (e) {
107138
const error = e as AggregateError;
139+
108140
this.dialErrorsForPeer.set(peerId.toString(), error);
109141
log(`Error dialing peer ${peerId.toString()} - ${error.errors}`);
110142

@@ -128,6 +160,33 @@ export class ConnectionManager {
128160
return await this.libp2pComponents.peerStore.delete(peerId);
129161
} catch (error) {
130162
throw `Error deleting undialable peer ${peerId.toString()} from peer store - ${error}`;
163+
} finally {
164+
this.currentActiveDialCount -= 1;
165+
this.processDialQueue();
166+
}
167+
}
168+
169+
async dropConnection(peerId: PeerId): Promise<void> {
170+
try {
171+
await this.libp2pComponents.hangUp(peerId);
172+
log(`Dropped connection with peer ${peerId.toString()}`);
173+
} catch (error) {
174+
log(
175+
`Error dropping connection with peer ${peerId.toString()} - ${error}`
176+
);
177+
}
178+
}
179+
180+
private async processDialQueue(): Promise<void> {
181+
if (
182+
this.pendingPeerDialQueue.length > 0 &&
183+
this.currentActiveDialCount < this.options.maxParallelDials
184+
) {
185+
const peerId = this.pendingPeerDialQueue.shift();
186+
if (!peerId) return;
187+
this.attemptDial(peerId).catch((error) => {
188+
log(error);
189+
});
131190
}
132191
}
133192

@@ -164,21 +223,50 @@ export class ConnectionManager {
164223
);
165224
}
166225

226+
private async attemptDial(peerId: PeerId): Promise<void> {
227+
if (this.currentActiveDialCount >= this.options.maxParallelDials) {
228+
this.pendingPeerDialQueue.push(peerId);
229+
return;
230+
}
231+
232+
if (!(await this.shouldDialPeer(peerId))) return;
233+
234+
this.dialPeer(peerId).catch((err) => {
235+
throw `Error dialing peer ${peerId.toString()} : ${err}`;
236+
});
237+
}
238+
167239
private onEventHandlers = {
168240
"peer:discovery": async (evt: CustomEvent<PeerInfo>): Promise<void> => {
169241
const { id: peerId } = evt.detail;
170-
if (!(await this.shouldDialPeer(peerId))) return;
171242

172-
this.dialPeer(peerId).catch((err) =>
243+
this.attemptDial(peerId).catch((err) =>
173244
log(`Error dialing peer ${peerId.toString()} : ${err}`)
174245
);
175246
},
176-
"peer:connect": (evt: CustomEvent<Connection>): void => {
177-
{
178-
this.keepAliveManager.start(
179-
evt.detail.remotePeer,
180-
this.libp2pComponents.ping.bind(this)
181-
);
247+
"peer:connect": async (evt: CustomEvent<Connection>): Promise<void> => {
248+
const { remotePeer: peerId } = evt.detail;
249+
250+
this.keepAliveManager.start(
251+
peerId,
252+
this.libp2pComponents.ping.bind(this)
253+
);
254+
255+
const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
256+
Tags.BOOTSTRAP
257+
);
258+
259+
if (isBootstrap) {
260+
const bootstrapConnections = this.libp2pComponents
261+
.getConnections()
262+
.filter((conn) => conn.tags.includes(Tags.BOOTSTRAP));
263+
264+
// If we have too many bootstrap connections, drop one
265+
if (
266+
bootstrapConnections.length > this.options.maxBootstrapPeersAllowed
267+
) {
268+
await this.dropConnection(peerId);
269+
}
182270
}
183271
},
184272
"peer:disconnect": () => {

packages/dns-discovery/src/index.ts

+10-7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ export interface Options {
3232
/**
3333
* ENR URL to use for DNS discovery
3434
*/
35-
enrUrl: string;
35+
enrUrls: string | string[];
3636
/**
3737
* Specifies what type of nodes are wanted from the discovery process
3838
*/
@@ -71,8 +71,8 @@ export class PeerDiscoveryDns
7171
this._components = components;
7272
this._options = options;
7373

74-
const { enrUrl } = options;
75-
log("Use following EIP-1459 ENR Tree URL: ", enrUrl);
74+
const { enrUrls } = options;
75+
log("Use following EIP-1459 ENR Tree URLs: ", enrUrls);
7676
}
7777

7878
/**
@@ -84,12 +84,15 @@ export class PeerDiscoveryDns
8484
this._started = true;
8585

8686
if (this.nextPeer === undefined) {
87-
const { enrUrl, wantedNodeCapabilityCount } = this._options;
87+
let { enrUrls } = this._options;
88+
if (!Array.isArray(enrUrls)) enrUrls = [enrUrls];
89+
90+
const { wantedNodeCapabilityCount } = this._options;
8891
const dns = await DnsNodeDiscovery.dnsOverHttp();
8992

9093
this.nextPeer = dns.getNextPeer.bind(
9194
dns,
92-
[enrUrl],
95+
enrUrls,
9396
wantedNodeCapabilityCount
9497
);
9598
}
@@ -138,11 +141,11 @@ export class PeerDiscoveryDns
138141
}
139142

140143
export function wakuDnsDiscovery(
141-
enrUrl: string,
144+
enrUrls: string[],
142145
wantedNodeCapabilityCount: Partial<NodeCapabilityCount>
143146
): (components: DnsDiscoveryComponents) => PeerDiscoveryDns {
144147
return (components: DnsDiscoveryComponents) =>
145-
new PeerDiscoveryDns(components, { enrUrl, wantedNodeCapabilityCount });
148+
new PeerDiscoveryDns(components, { enrUrls, wantedNodeCapabilityCount });
146149
}
147150

148151
export { DnsNodeDiscovery, SearchContext, DnsClient } from "./dns.js";

packages/interfaces/src/connection_manager.ts

+4
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,8 @@ export interface ConnectionManagerOptions {
1414
* This is used to increase intention of dialing non-bootstrap peers, found using other discovery mechanisms (like Peer Exchange)
1515
*/
1616
maxBootstrapPeersAllowed: number;
17+
/**
18+
* Max number of parallel dials allowed
19+
*/
20+
maxParallelDials: number;
1721
}

packages/sdk/src/create.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ export async function createFullNode(
171171
export function defaultPeerDiscovery(): (
172172
components: Libp2pComponents
173173
) => PeerDiscovery {
174-
return wakuDnsDiscovery(enrTree["PROD"], DEFAULT_NODE_REQUIREMENTS);
174+
return wakuDnsDiscovery([enrTree["PROD"]], DEFAULT_NODE_REQUIREMENTS);
175175
}
176176

177177
export async function defaultLibp2p(

packages/tests/tests/dns-peer-discovery.spec.ts

+28-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import { createLightNode } from "@waku/sdk";
1313
import { expect } from "chai";
1414
import { MemoryDatastore } from "datastore-core";
1515

16+
import { delay } from "../src/delay.js";
17+
1618
const maxQuantity = 3;
1719

1820
describe("DNS Discovery: Compliance Test", async function () {
@@ -28,7 +30,7 @@ describe("DNS Discovery: Compliance Test", async function () {
2830
});
2931

3032
return new PeerDiscoveryDns(components, {
31-
enrUrl: enrTree["PROD"],
33+
enrUrls: [enrTree["PROD"]],
3234
wantedNodeCapabilityCount: {
3335
filter: 1,
3436
},
@@ -60,7 +62,7 @@ describe("DNS Node Discovery [live data]", function () {
6062

6163
const waku = await createLightNode({
6264
libp2p: {
63-
peerDiscovery: [wakuDnsDiscovery(enrTree["PROD"], nodeRequirements)],
65+
peerDiscovery: [wakuDnsDiscovery([enrTree["PROD"]], nodeRequirements)],
6466
},
6567
});
6668

@@ -110,4 +112,28 @@ describe("DNS Node Discovery [live data]", function () {
110112
seen.push(ma!.toString());
111113
}
112114
});
115+
it("passes more than one ENR URLs and attempts connection", async function () {
116+
if (process.env.CI) this.skip();
117+
this.timeout(30_000);
118+
119+
const nodesToConnect = 2;
120+
121+
const waku = await createLightNode({
122+
libp2p: {
123+
peerDiscovery: [
124+
wakuDnsDiscovery([enrTree["PROD"], enrTree["TEST"]], {
125+
filter: nodesToConnect,
126+
}),
127+
],
128+
},
129+
});
130+
131+
await waku.start();
132+
133+
const allPeers = await waku.libp2p.peerStore.all();
134+
while (allPeers.length < nodesToConnect) {
135+
await delay(2000);
136+
}
137+
expect(allPeers.length).to.be.eq(nodesToConnect);
138+
});
113139
});

0 commit comments

Comments
 (0)