Skip to content

Commit 4c0a743

Browse files
committed
feat: track node connection state
1 parent 535a748 commit 4c0a743

File tree

6 files changed

+210
-5
lines changed

6 files changed

+210
-5
lines changed

packages/core/src/lib/connection_manager.ts

+25-4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,25 @@ export class ConnectionManager
4040

4141
private currentActiveParallelDialCount = 0;
4242
private pendingPeerDialQueue: Array<PeerId> = [];
43+
private online: boolean = false;
44+
45+
public isConnected(): boolean {
46+
return this.online;
47+
}
48+
49+
private toggleOnline(): void {
50+
if (!this.online) {
51+
this.online = true;
52+
this.dispatchEvent(new CustomEvent(EPeersByDiscoveryEvents.NODE_ONLINE));
53+
}
54+
}
55+
56+
private toggleOffline(): void {
57+
if (!this.keepAliveManager.connectionsExist()) {
58+
this.online = false;
59+
this.dispatchEvent(new CustomEvent(EPeersByDiscoveryEvents.NODE_OFFLINE));
60+
}
61+
}
4362

4463
public static create(
4564
peerId: string,
@@ -393,12 +412,14 @@ export class ConnectionManager
393412
)
394413
);
395414
}
415+
this.toggleOnline();
396416
})();
397417
},
398-
"peer:disconnect": () => {
399-
return (evt: CustomEvent<PeerId>): void => {
418+
"peer:disconnect": (evt: CustomEvent<PeerId>): void => {
419+
void (async () => {
400420
this.keepAliveManager.stop(evt.detail);
401-
};
421+
this.toggleOffline();
422+
})();
402423
}
403424
};
404425

@@ -427,7 +448,7 @@ export class ConnectionManager
427448
log.warn(
428449
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
429450
this.configuredPubSubTopics
430-
}).
451+
}).
431452
Not dialing.`
432453
);
433454
return false;

packages/core/src/lib/keep_alive_manager.ts

+6
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,12 @@ export class KeepAliveManager {
111111
this.relayKeepAliveTimers.clear();
112112
}
113113

114+
public connectionsExist(): boolean {
115+
return (
116+
this.pingKeepAliveTimers.size > 0 || this.relayKeepAliveTimers.size > 0
117+
);
118+
}
119+
114120
private scheduleRelayPings(
115121
relay: IRelay,
116122
relayPeriodSecs: number,

packages/core/src/lib/waku.ts

+4
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ export class WakuNode implements Waku {
178178
return this.libp2p.isStarted();
179179
}
180180

181+
isConnected(): boolean {
182+
return this.connectionManager.isConnected();
183+
}
184+
181185
/**
182186
* Return the local multiaddr with peer id on which libp2p is listening.
183187
*

packages/interfaces/src/connection_manager.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,18 @@ export enum EPeersByDiscoveryEvents {
2828
PEER_DISCOVERY_BOOTSTRAP = "peer:discovery:bootstrap",
2929
PEER_DISCOVERY_PEER_EXCHANGE = "peer:discovery:peer-exchange",
3030
PEER_CONNECT_BOOTSTRAP = "peer:connected:bootstrap",
31-
PEER_CONNECT_PEER_EXCHANGE = "peer:connected:peer-exchange"
31+
PEER_CONNECT_PEER_EXCHANGE = "peer:connected:peer-exchange",
32+
NODE_ONLINE = "waku:online",
33+
NODE_OFFLINE = "waku:offline"
3234
}
3335

3436
export interface IPeersByDiscoveryEvents {
3537
[EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP]: CustomEvent<PeerId>;
3638
[EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE]: CustomEvent<PeerId>;
3739
[EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP]: CustomEvent<PeerId>;
3840
[EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE]: CustomEvent<PeerId>;
41+
[EPeersByDiscoveryEvents.NODE_ONLINE]: CustomEvent<void>;
42+
[EPeersByDiscoveryEvents.NODE_OFFLINE]: CustomEvent<void>;
3943
}
4044

4145
export interface PeersByDiscoveryResult {

packages/interfaces/src/waku.ts

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ export interface Waku {
2626
stop(): Promise<void>;
2727

2828
isStarted(): boolean;
29+
30+
isConnected(): boolean;
2931
}
3032

3133
export interface LightNode extends Waku {

packages/tests/tests/connection_manager.spec.ts

+168
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,174 @@ describe("ConnectionManager", function () {
155155

156156
expect(await peerConnectedPeerExchange).to.eq(true);
157157
});
158+
it("should emit `waku:online` event only when first peer is connected", async function () {
159+
const peerIdPx = await createSecp256k1PeerId();
160+
161+
await waku.libp2p.peerStore.save(peerIdPx, {
162+
tags: {
163+
[Tags.PEER_EXCHANGE]: {
164+
value: 50,
165+
ttl: 1200000
166+
}
167+
}
168+
});
169+
170+
let eventCount = 0;
171+
const wakuOnline = new Promise<boolean>((resolve) => {
172+
waku.connectionManager.addEventListener(
173+
EPeersByDiscoveryEvents.NODE_ONLINE,
174+
() => {
175+
eventCount++;
176+
resolve(true);
177+
}
178+
);
179+
});
180+
181+
expect(waku.isConnected()).to.be.false;
182+
183+
waku.libp2p.dispatchEvent(
184+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
185+
);
186+
waku.libp2p.dispatchEvent(
187+
new CustomEvent<PeerId>("peer:connect", {
188+
detail: await createSecp256k1PeerId()
189+
})
190+
);
191+
waku.libp2p.dispatchEvent(
192+
new CustomEvent<PeerId>("peer:connect", {
193+
detail: await createSecp256k1PeerId()
194+
})
195+
);
196+
197+
expect(await wakuOnline).to.eq(true);
198+
expect(eventCount).to.be.eq(1);
199+
});
200+
it("isConnected should return true after first peer connects", async function () {
201+
const peerIdPx = await createSecp256k1PeerId();
202+
203+
await waku.libp2p.peerStore.save(peerIdPx, {
204+
tags: {
205+
[Tags.PEER_EXCHANGE]: {
206+
value: 50,
207+
ttl: 1200000
208+
}
209+
}
210+
});
211+
212+
expect(waku.isConnected()).to.be.false;
213+
214+
waku.libp2p.dispatchEvent(
215+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
216+
);
217+
waku.libp2p.dispatchEvent(
218+
new CustomEvent<PeerId>("peer:connect", {
219+
detail: await createSecp256k1PeerId()
220+
})
221+
);
222+
223+
await delay(100);
224+
225+
expect(waku.isConnected()).to.be.true;
226+
});
227+
});
228+
229+
describe("peer:disconnect", () => {
230+
it("should emit `waku:offline` event when all peers disconnect", async function () {
231+
const peerIdPx = await createSecp256k1PeerId();
232+
const peerIdPx2 = await createSecp256k1PeerId();
233+
234+
await waku.libp2p.peerStore.save(peerIdPx, {
235+
tags: {
236+
[Tags.PEER_EXCHANGE]: {
237+
value: 50,
238+
ttl: 1200000
239+
}
240+
}
241+
});
242+
243+
await waku.libp2p.peerStore.save(peerIdPx2, {
244+
tags: {
245+
[Tags.PEER_EXCHANGE]: {
246+
value: 50,
247+
ttl: 1200000
248+
}
249+
}
250+
});
251+
252+
let eventCount = 0;
253+
const wakuOffline = new Promise<boolean>((resolve) => {
254+
waku.connectionManager.addEventListener(
255+
EPeersByDiscoveryEvents.NODE_OFFLINE,
256+
() => {
257+
eventCount++;
258+
resolve(true);
259+
}
260+
);
261+
});
262+
263+
waku.libp2p.dispatchEvent(
264+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
265+
);
266+
waku.libp2p.dispatchEvent(
267+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx2 })
268+
);
269+
270+
await delay(100);
271+
272+
expect(waku.isConnected()).to.be.true;
273+
274+
waku.libp2p.dispatchEvent(
275+
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx })
276+
);
277+
waku.libp2p.dispatchEvent(
278+
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx2 })
279+
);
280+
281+
expect(await wakuOffline).to.eq(true);
282+
expect(eventCount).to.be.eq(1);
283+
});
284+
it("isConnected should return false after all peers disconnect", async function () {
285+
const peerIdPx = await createSecp256k1PeerId();
286+
const peerIdPx2 = await createSecp256k1PeerId();
287+
288+
await waku.libp2p.peerStore.save(peerIdPx, {
289+
tags: {
290+
[Tags.PEER_EXCHANGE]: {
291+
value: 50,
292+
ttl: 1200000
293+
}
294+
}
295+
});
296+
297+
await waku.libp2p.peerStore.save(peerIdPx2, {
298+
tags: {
299+
[Tags.PEER_EXCHANGE]: {
300+
value: 50,
301+
ttl: 1200000
302+
}
303+
}
304+
});
305+
306+
waku.libp2p.dispatchEvent(
307+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
308+
);
309+
waku.libp2p.dispatchEvent(
310+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx2 })
311+
);
312+
313+
await delay(100);
314+
315+
expect(waku.isConnected()).to.be.true;
316+
317+
waku.libp2p.dispatchEvent(
318+
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx })
319+
);
320+
waku.libp2p.dispatchEvent(
321+
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx2 })
322+
);
323+
324+
expect(waku.isConnected()).to.be.false;
325+
});
158326
});
159327
});
160328

0 commit comments

Comments
 (0)