Skip to content

Commit b1e7a63

Browse files
committed
feat: track node connection state
1 parent 5715d7f commit b1e7a63

File tree

6 files changed

+232
-7
lines changed

6 files changed

+232
-7
lines changed

packages/core/src/lib/connection_manager.ts

+36-5
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
66
import { decodeRelayShard } from "@waku/enr";
77
import {
88
ConnectionManagerOptions,
9+
EConnectionStateEvents,
910
EPeersByDiscoveryEvents,
1011
IConnectionManager,
12+
IConnectionStateEvents,
1113
IPeersByDiscoveryEvents,
1214
IRelay,
1315
KeepAliveOptions,
@@ -28,7 +30,7 @@ export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
2830
export const DEFAULT_MAX_PARALLEL_DIALS = 3;
2931

3032
export class ConnectionManager
31-
extends EventEmitter<IPeersByDiscoveryEvents>
33+
extends EventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents>
3234
implements IConnectionManager
3335
{
3436
private static instances = new Map<string, ConnectionManager>();
@@ -40,6 +42,33 @@ export class ConnectionManager
4042

4143
private currentActiveParallelDialCount = 0;
4244
private pendingPeerDialQueue: Array<PeerId> = [];
45+
private online: boolean = false;
46+
47+
public isConnected(): boolean {
48+
return this.online;
49+
}
50+
51+
private toggleOnline(): void {
52+
if (!this.online) {
53+
this.online = true;
54+
this.dispatchEvent(
55+
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
56+
detail: this.online
57+
})
58+
);
59+
}
60+
}
61+
62+
private toggleOffline(): void {
63+
if (this.online && !this.keepAliveManager.connectionsExist()) {
64+
this.online = false;
65+
this.dispatchEvent(
66+
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
67+
detail: this.online
68+
})
69+
);
70+
}
71+
}
4372

4473
public static create(
4574
peerId: string,
@@ -393,12 +422,14 @@ export class ConnectionManager
393422
)
394423
);
395424
}
425+
this.toggleOnline();
396426
})();
397427
},
398-
"peer:disconnect": () => {
399-
return (evt: CustomEvent<PeerId>): void => {
428+
"peer:disconnect": (evt: CustomEvent<PeerId>): void => {
429+
void (async () => {
400430
this.keepAliveManager.stop(evt.detail);
401-
};
431+
this.toggleOffline();
432+
})();
402433
}
403434
};
404435

@@ -427,7 +458,7 @@ export class ConnectionManager
427458
log.warn(
428459
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
429460
this.configuredPubsubTopics
430-
}).
461+
}).
431462
Not dialing.`
432463
);
433464
return false;

packages/core/src/lib/keep_alive_manager.ts

+6
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,12 @@ export class KeepAliveManager {
111111
this.relayKeepAliveTimers.clear();
112112
}
113113

114+
public connectionsExist(): boolean {
115+
return (
116+
this.pingKeepAliveTimers.size > 0 || this.relayKeepAliveTimers.size > 0
117+
);
118+
}
119+
114120
private scheduleRelayPings(
115121
relay: IRelay,
116122
relayPeriodSecs: number,

packages/core/src/lib/waku.ts

+4
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ export class WakuNode implements Waku {
178178
return this.libp2p.isStarted();
179179
}
180180

181+
isConnected(): boolean {
182+
return this.connectionManager.isConnected();
183+
}
184+
181185
/**
182186
* Return the local multiaddr with peer id on which libp2p is listening.
183187
*

packages/interfaces/src/connection_manager.ts

+10-1
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,17 @@ export interface PeersByDiscoveryResult {
4949
};
5050
}
5151

52+
export enum EConnectionStateEvents {
53+
CONNECTION_STATUS = "waku:connection"
54+
}
55+
56+
export interface IConnectionStateEvents {
57+
// true when online, false when offline
58+
[EConnectionStateEvents.CONNECTION_STATUS]: CustomEvent<boolean>;
59+
}
60+
5261
export interface IConnectionManager
53-
extends EventEmitter<IPeersByDiscoveryEvents> {
62+
extends EventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents> {
5463
getPeersByDiscovery(): Promise<PeersByDiscoveryResult>;
5564
stop(): void;
5665
}

packages/interfaces/src/waku.ts

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ export interface Waku {
2626
stop(): Promise<void>;
2727

2828
isStarted(): boolean;
29+
30+
isConnected(): boolean;
2931
}
3032

3133
export interface LightNode extends Waku {

packages/tests/tests/connection_manager.spec.ts

+174-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@ import type { PeerId } from "@libp2p/interface/peer-id";
22
import type { PeerInfo } from "@libp2p/interface/peer-info";
33
import { CustomEvent } from "@libp2p/interfaces/events";
44
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
5-
import { EPeersByDiscoveryEvents, LightNode, Tags } from "@waku/interfaces";
5+
import {
6+
EConnectionStateEvents,
7+
EPeersByDiscoveryEvents,
8+
LightNode,
9+
Tags
10+
} from "@waku/interfaces";
611
import { createLightNode } from "@waku/sdk";
712
import { expect } from "chai";
813
import sinon, { SinonSpy, SinonStub } from "sinon";
@@ -155,6 +160,174 @@ describe("ConnectionManager", function () {
155160

156161
expect(await peerConnectedPeerExchange).to.eq(true);
157162
});
163+
it("should emit `waku:online` event only when first peer is connected", async function () {
164+
const peerIdPx = await createSecp256k1PeerId();
165+
166+
await waku.libp2p.peerStore.save(peerIdPx, {
167+
tags: {
168+
[Tags.PEER_EXCHANGE]: {
169+
value: 50,
170+
ttl: 1200000
171+
}
172+
}
173+
});
174+
175+
let eventCount = 0;
176+
const connectionStatus = new Promise<boolean>((resolve) => {
177+
waku.connectionManager.addEventListener(
178+
EConnectionStateEvents.CONNECTION_STATUS,
179+
({ detail: status }) => {
180+
eventCount++;
181+
resolve(status);
182+
}
183+
);
184+
});
185+
186+
expect(waku.isConnected()).to.be.false;
187+
188+
waku.libp2p.dispatchEvent(
189+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
190+
);
191+
waku.libp2p.dispatchEvent(
192+
new CustomEvent<PeerId>("peer:connect", {
193+
detail: await createSecp256k1PeerId()
194+
})
195+
);
196+
waku.libp2p.dispatchEvent(
197+
new CustomEvent<PeerId>("peer:connect", {
198+
detail: await createSecp256k1PeerId()
199+
})
200+
);
201+
202+
expect(await connectionStatus).to.eq(true);
203+
expect(eventCount).to.be.eq(1);
204+
});
205+
it("isConnected should return true after first peer connects", async function () {
206+
const peerIdPx = await createSecp256k1PeerId();
207+
208+
await waku.libp2p.peerStore.save(peerIdPx, {
209+
tags: {
210+
[Tags.PEER_EXCHANGE]: {
211+
value: 50,
212+
ttl: 1200000
213+
}
214+
}
215+
});
216+
217+
expect(waku.isConnected()).to.be.false;
218+
219+
waku.libp2p.dispatchEvent(
220+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
221+
);
222+
waku.libp2p.dispatchEvent(
223+
new CustomEvent<PeerId>("peer:connect", {
224+
detail: await createSecp256k1PeerId()
225+
})
226+
);
227+
228+
await delay(100);
229+
230+
expect(waku.isConnected()).to.be.true;
231+
});
232+
});
233+
234+
describe("peer:disconnect", () => {
235+
it("should emit `waku:offline` event when all peers disconnect", async function () {
236+
const peerIdPx = await createSecp256k1PeerId();
237+
const peerIdPx2 = await createSecp256k1PeerId();
238+
239+
await waku.libp2p.peerStore.save(peerIdPx, {
240+
tags: {
241+
[Tags.PEER_EXCHANGE]: {
242+
value: 50,
243+
ttl: 1200000
244+
}
245+
}
246+
});
247+
248+
await waku.libp2p.peerStore.save(peerIdPx2, {
249+
tags: {
250+
[Tags.PEER_EXCHANGE]: {
251+
value: 50,
252+
ttl: 1200000
253+
}
254+
}
255+
});
256+
257+
waku.libp2p.dispatchEvent(
258+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
259+
);
260+
waku.libp2p.dispatchEvent(
261+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx2 })
262+
);
263+
264+
await delay(100);
265+
266+
let eventCount = 0;
267+
const connectionStatus = new Promise<boolean>((resolve) => {
268+
waku.connectionManager.addEventListener(
269+
EConnectionStateEvents.CONNECTION_STATUS,
270+
({ detail: status }) => {
271+
eventCount++;
272+
resolve(status);
273+
}
274+
);
275+
});
276+
277+
expect(waku.isConnected()).to.be.true;
278+
279+
waku.libp2p.dispatchEvent(
280+
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx })
281+
);
282+
waku.libp2p.dispatchEvent(
283+
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx2 })
284+
);
285+
286+
expect(await connectionStatus).to.eq(false);
287+
expect(eventCount).to.be.eq(1);
288+
});
289+
it("isConnected should return false after all peers disconnect", async function () {
290+
const peerIdPx = await createSecp256k1PeerId();
291+
const peerIdPx2 = await createSecp256k1PeerId();
292+
293+
await waku.libp2p.peerStore.save(peerIdPx, {
294+
tags: {
295+
[Tags.PEER_EXCHANGE]: {
296+
value: 50,
297+
ttl: 1200000
298+
}
299+
}
300+
});
301+
302+
await waku.libp2p.peerStore.save(peerIdPx2, {
303+
tags: {
304+
[Tags.PEER_EXCHANGE]: {
305+
value: 50,
306+
ttl: 1200000
307+
}
308+
}
309+
});
310+
311+
waku.libp2p.dispatchEvent(
312+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
313+
);
314+
waku.libp2p.dispatchEvent(
315+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx2 })
316+
);
317+
318+
await delay(100);
319+
320+
expect(waku.isConnected()).to.be.true;
321+
322+
waku.libp2p.dispatchEvent(
323+
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx })
324+
);
325+
waku.libp2p.dispatchEvent(
326+
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx2 })
327+
);
328+
329+
expect(waku.isConnected()).to.be.false;
330+
});
158331
});
159332
});
160333

0 commit comments

Comments
 (0)