Skip to content

Commit 1d0e2ac

Browse files
adklempnerchair28980weboko
authored
feat: track node connection state (#1719)
Co-authored-by: chair <29414216+chair28980@users.noreply.github.com> Co-authored-by: Sasha <118575614+weboko@users.noreply.github.com>
1 parent affdc26 commit 1d0e2ac

File tree

6 files changed

+274
-8
lines changed

6 files changed

+274
-8
lines changed

packages/core/src/lib/connection_manager.ts

+36-5
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
66
import { decodeRelayShard } from "@waku/enr";
77
import {
88
ConnectionManagerOptions,
9+
EConnectionStateEvents,
910
EPeersByDiscoveryEvents,
1011
IConnectionManager,
12+
IConnectionStateEvents,
1113
IPeersByDiscoveryEvents,
1214
IRelay,
1315
KeepAliveOptions,
@@ -28,7 +30,7 @@ export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
2830
export const DEFAULT_MAX_PARALLEL_DIALS = 3;
2931

3032
export class ConnectionManager
31-
extends EventEmitter<IPeersByDiscoveryEvents>
33+
extends EventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents>
3234
implements IConnectionManager
3335
{
3436
private static instances = new Map<string, ConnectionManager>();
@@ -40,6 +42,33 @@ export class ConnectionManager
4042

4143
private currentActiveParallelDialCount = 0;
4244
private pendingPeerDialQueue: Array<PeerId> = [];
45+
private online: boolean = false;
46+
47+
public isConnected(): boolean {
48+
return this.online;
49+
}
50+
51+
private toggleOnline(): void {
52+
if (!this.online) {
53+
this.online = true;
54+
this.dispatchEvent(
55+
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
56+
detail: this.online
57+
})
58+
);
59+
}
60+
}
61+
62+
private toggleOffline(): void {
63+
if (this.online && this.libp2p.getConnections().length == 0) {
64+
this.online = false;
65+
this.dispatchEvent(
66+
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
67+
detail: this.online
68+
})
69+
);
70+
}
71+
}
4372

4473
public static create(
4574
peerId: string,
@@ -393,12 +422,14 @@ export class ConnectionManager
393422
)
394423
);
395424
}
425+
this.toggleOnline();
396426
})();
397427
},
398-
"peer:disconnect": () => {
399-
return (evt: CustomEvent<PeerId>): void => {
428+
"peer:disconnect": (evt: CustomEvent<PeerId>): void => {
429+
void (async () => {
400430
this.keepAliveManager.stop(evt.detail);
401-
};
431+
this.toggleOffline();
432+
})();
402433
}
403434
};
404435

@@ -427,7 +458,7 @@ export class ConnectionManager
427458
log.warn(
428459
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
429460
this.configuredPubsubTopics
430-
}).
461+
}).
431462
Not dialing.`
432463
);
433464
return false;

packages/core/src/lib/keep_alive_manager.ts

+6
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,12 @@ export class KeepAliveManager {
111111
this.relayKeepAliveTimers.clear();
112112
}
113113

114+
public connectionsExist(): boolean {
115+
return (
116+
this.pingKeepAliveTimers.size > 0 || this.relayKeepAliveTimers.size > 0
117+
);
118+
}
119+
114120
private scheduleRelayPings(
115121
relay: IRelay,
116122
relayPeriodSecs: number,

packages/core/src/lib/waku.ts

+4
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ export class WakuNode implements Waku {
178178
return this.libp2p.isStarted();
179179
}
180180

181+
isConnected(): boolean {
182+
return this.connectionManager.isConnected();
183+
}
184+
181185
/**
182186
* Return the local multiaddr with peer id on which libp2p is listening.
183187
*

packages/interfaces/src/connection_manager.ts

+10-1
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,17 @@ export interface PeersByDiscoveryResult {
4949
};
5050
}
5151

52+
export enum EConnectionStateEvents {
53+
CONNECTION_STATUS = "waku:connection"
54+
}
55+
56+
export interface IConnectionStateEvents {
57+
// true when online, false when offline
58+
[EConnectionStateEvents.CONNECTION_STATUS]: CustomEvent<boolean>;
59+
}
60+
5261
export interface IConnectionManager
53-
extends EventEmitter<IPeersByDiscoveryEvents> {
62+
extends EventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents> {
5463
getPeersByDiscovery(): Promise<PeersByDiscoveryResult>;
5564
stop(): void;
5665
}

packages/interfaces/src/waku.ts

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ export interface Waku {
2626
stop(): Promise<void>;
2727

2828
isStarted(): boolean;
29+
30+
isConnected(): boolean;
2931
}
3032

3133
export interface LightNode extends Waku {

packages/tests/tests/connection_manager.spec.ts

+216-2
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,26 @@ import type { PeerId } from "@libp2p/interface/peer-id";
22
import type { PeerInfo } from "@libp2p/interface/peer-info";
33
import { CustomEvent } from "@libp2p/interfaces/events";
44
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
5-
import { EPeersByDiscoveryEvents, LightNode, Tags } from "@waku/interfaces";
5+
import { Multiaddr } from "@multiformats/multiaddr";
6+
import {
7+
EConnectionStateEvents,
8+
EPeersByDiscoveryEvents,
9+
LightNode,
10+
Protocols,
11+
Tags
12+
} from "@waku/interfaces";
613
import { createLightNode } from "@waku/sdk";
714
import { expect } from "chai";
815
import sinon, { SinonSpy, SinonStub } from "sinon";
916

1017
import { delay } from "../dist/delay.js";
11-
import { tearDownNodes } from "../src/index.js";
18+
import { makeLogFileName, NimGoNode, tearDownNodes } from "../src/index.js";
1219

1320
const TEST_TIMEOUT = 10_000;
1421
const DELAY_MS = 1_000;
1522

1623
describe("ConnectionManager", function () {
24+
this.timeout(20_000);
1725
let waku: LightNode;
1826

1927
beforeEach(async function () {
@@ -156,6 +164,105 @@ describe("ConnectionManager", function () {
156164
expect(await peerConnectedPeerExchange).to.eq(true);
157165
});
158166
});
167+
168+
describe("peer:disconnect", () => {
169+
it("should emit `waku:offline` event when all peers disconnect", async function () {
170+
const peerIdPx = await createSecp256k1PeerId();
171+
const peerIdPx2 = await createSecp256k1PeerId();
172+
173+
await waku.libp2p.peerStore.save(peerIdPx, {
174+
tags: {
175+
[Tags.PEER_EXCHANGE]: {
176+
value: 50,
177+
ttl: 1200000
178+
}
179+
}
180+
});
181+
182+
await waku.libp2p.peerStore.save(peerIdPx2, {
183+
tags: {
184+
[Tags.PEER_EXCHANGE]: {
185+
value: 50,
186+
ttl: 1200000
187+
}
188+
}
189+
});
190+
191+
waku.libp2p.dispatchEvent(
192+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
193+
);
194+
waku.libp2p.dispatchEvent(
195+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx2 })
196+
);
197+
198+
await delay(100);
199+
200+
let eventCount = 0;
201+
const connectionStatus = new Promise<boolean>((resolve) => {
202+
waku.connectionManager.addEventListener(
203+
EConnectionStateEvents.CONNECTION_STATUS,
204+
({ detail: status }) => {
205+
eventCount++;
206+
resolve(status);
207+
}
208+
);
209+
});
210+
211+
expect(waku.isConnected()).to.be.true;
212+
213+
waku.libp2p.dispatchEvent(
214+
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx })
215+
);
216+
waku.libp2p.dispatchEvent(
217+
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx2 })
218+
);
219+
220+
expect(await connectionStatus).to.eq(false);
221+
expect(eventCount).to.be.eq(1);
222+
});
223+
it("isConnected should return false after all peers disconnect", async function () {
224+
const peerIdPx = await createSecp256k1PeerId();
225+
const peerIdPx2 = await createSecp256k1PeerId();
226+
227+
await waku.libp2p.peerStore.save(peerIdPx, {
228+
tags: {
229+
[Tags.PEER_EXCHANGE]: {
230+
value: 50,
231+
ttl: 1200000
232+
}
233+
}
234+
});
235+
236+
await waku.libp2p.peerStore.save(peerIdPx2, {
237+
tags: {
238+
[Tags.PEER_EXCHANGE]: {
239+
value: 50,
240+
ttl: 1200000
241+
}
242+
}
243+
});
244+
245+
waku.libp2p.dispatchEvent(
246+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
247+
);
248+
waku.libp2p.dispatchEvent(
249+
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx2 })
250+
);
251+
252+
await delay(100);
253+
254+
expect(waku.isConnected()).to.be.true;
255+
256+
waku.libp2p.dispatchEvent(
257+
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx })
258+
);
259+
waku.libp2p.dispatchEvent(
260+
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx2 })
261+
);
262+
263+
expect(waku.isConnected()).to.be.false;
264+
});
265+
});
159266
});
160267

161268
describe("Dials", () => {
@@ -376,4 +483,111 @@ describe("ConnectionManager", function () {
376483
});
377484
});
378485
});
486+
487+
describe("Connection state", () => {
488+
this.timeout(20_000);
489+
let nwaku1: NimGoNode;
490+
let nwaku2: NimGoNode;
491+
let nwaku1PeerId: Multiaddr;
492+
let nwaku2PeerId: Multiaddr;
493+
494+
beforeEach(async () => {
495+
this.timeout(20_000);
496+
nwaku1 = new NimGoNode(makeLogFileName(this.ctx) + "1");
497+
nwaku2 = new NimGoNode(makeLogFileName(this.ctx) + "2");
498+
await nwaku1.start({
499+
filter: true
500+
});
501+
502+
await nwaku2.start({
503+
filter: true
504+
});
505+
506+
nwaku1PeerId = await nwaku1.getMultiaddrWithId();
507+
nwaku2PeerId = await nwaku2.getMultiaddrWithId();
508+
});
509+
510+
afterEach(async () => {
511+
this.timeout(15000);
512+
await tearDownNodes([nwaku1, nwaku2], []);
513+
});
514+
515+
it("should emit `waku:online` event only when first peer is connected", async function () {
516+
this.timeout(20_000);
517+
518+
let eventCount = 0;
519+
const connectionStatus = new Promise<boolean>((resolve) => {
520+
waku.connectionManager.addEventListener(
521+
EConnectionStateEvents.CONNECTION_STATUS,
522+
({ detail: status }) => {
523+
eventCount++;
524+
resolve(status);
525+
}
526+
);
527+
});
528+
529+
// await waku.start();
530+
await waku.dial(nwaku1PeerId, [Protocols.Filter]);
531+
await waku.dial(nwaku2PeerId, [Protocols.Filter]);
532+
533+
await delay(250);
534+
535+
expect(await connectionStatus).to.eq(true);
536+
expect(eventCount).to.be.eq(1);
537+
});
538+
539+
it("isConnected should return true after first peer connects", async function () {
540+
this.timeout(20_000);
541+
expect(waku.isConnected()).to.be.false;
542+
543+
// await waku.start();
544+
await waku.dial(nwaku1PeerId, [Protocols.Filter]);
545+
await waku.dial(nwaku2PeerId, [Protocols.Filter]);
546+
547+
await delay(250);
548+
549+
expect(waku.isConnected()).to.be.true;
550+
});
551+
552+
it("should emit `waku:offline` event only when all peers disconnect", async function () {
553+
this.timeout(20_000);
554+
expect(waku.isConnected()).to.be.false;
555+
556+
await waku.dial(nwaku1PeerId, [Protocols.Filter]);
557+
await waku.dial(nwaku2PeerId, [Protocols.Filter]);
558+
559+
await delay(250);
560+
561+
let eventCount = 0;
562+
const connectionStatus = new Promise<boolean>((resolve) => {
563+
waku.connectionManager.addEventListener(
564+
EConnectionStateEvents.CONNECTION_STATUS,
565+
({ detail: status }) => {
566+
eventCount++;
567+
resolve(status);
568+
}
569+
);
570+
});
571+
572+
await waku.libp2p.hangUp(nwaku1PeerId);
573+
await waku.libp2p.hangUp(nwaku2PeerId);
574+
expect(await connectionStatus).to.eq(false);
575+
expect(eventCount).to.be.eq(1);
576+
});
577+
578+
it("isConnected should return false after all peers disconnect", async function () {
579+
this.timeout(20_000);
580+
expect(waku.isConnected()).to.be.false;
581+
582+
await waku.dial(nwaku1PeerId, [Protocols.Filter]);
583+
await waku.dial(nwaku2PeerId, [Protocols.Filter]);
584+
585+
await delay(250);
586+
expect(waku.isConnected()).to.be.true;
587+
588+
await waku.libp2p.hangUp(nwaku1PeerId);
589+
await waku.libp2p.hangUp(nwaku2PeerId);
590+
expect(waku.isConnected()).to.be.false;
591+
});
592+
});
379593
});

0 commit comments

Comments
 (0)