Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track node connection state #1719

Merged
merged 3 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import { decodeRelayShard } from "@waku/enr";
import {
ConnectionManagerOptions,
EConnectionStateEvents,
EPeersByDiscoveryEvents,
IConnectionManager,
IConnectionStateEvents,
IPeersByDiscoveryEvents,
IRelay,
KeepAliveOptions,
Expand All @@ -28,7 +30,7 @@ export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
export const DEFAULT_MAX_PARALLEL_DIALS = 3;

export class ConnectionManager
extends EventEmitter<IPeersByDiscoveryEvents>
extends EventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents>
implements IConnectionManager
{
private static instances = new Map<string, ConnectionManager>();
Expand All @@ -40,6 +42,33 @@ export class ConnectionManager

private currentActiveParallelDialCount = 0;
private pendingPeerDialQueue: Array<PeerId> = [];
private online: boolean = false;

public isConnected(): boolean {
return this.online;
}

private toggleOnline(): void {
if (!this.online) {
this.online = true;
this.dispatchEvent(
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
detail: this.online
})
);
}
}

private toggleOffline(): void {
if (this.online && this.libp2p.getConnections().length == 0) {
this.online = false;
this.dispatchEvent(
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
detail: this.online
})
);
}
}
Comment on lines +51 to +71
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe this particular dispatch event can be written as a private function for the class: dispatchConnectionStatus() since it's being reused twice.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a usual concern with these type of functions. I like current one as it is explicitly mentions when it becomes online and when offline.

@adklempner can follow up if thinks it is necessary


public static create(
peerId: string,
Expand Down Expand Up @@ -393,12 +422,14 @@ export class ConnectionManager
)
);
}
this.toggleOnline();
})();
},
"peer:disconnect": () => {
return (evt: CustomEvent<PeerId>): void => {
"peer:disconnect": (evt: CustomEvent<PeerId>): void => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I tested locally, this event handler was not being called at all until I changed it to use the same pattern as the two above

void (async () => {
this.keepAliveManager.stop(evt.detail);
};
this.toggleOffline();
})();
}
};

Expand Down Expand Up @@ -427,7 +458,7 @@ export class ConnectionManager
log.warn(
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
this.configuredPubsubTopics
}).
}).
Not dialing.`
);
return false;
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/lib/keep_alive_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ export class KeepAliveManager {
this.relayKeepAliveTimers.clear();
}

public connectionsExist(): boolean {
return (
this.pingKeepAliveTimers.size > 0 || this.relayKeepAliveTimers.size > 0
);
}

private scheduleRelayPings(
relay: IRelay,
relayPeriodSecs: number,
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/lib/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ export class WakuNode implements Waku {
return this.libp2p.isStarted();
}

isConnected(): boolean {
return this.connectionManager.isConnected();
}

/**
* Return the local multiaddr with peer id on which libp2p is listening.
*
Expand Down
11 changes: 10 additions & 1 deletion packages/interfaces/src/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,17 @@ export interface PeersByDiscoveryResult {
};
}

export enum EConnectionStateEvents {
CONNECTION_STATUS = "waku:connection"
}

export interface IConnectionStateEvents {
// true when online, false when offline
[EConnectionStateEvents.CONNECTION_STATUS]: CustomEvent<boolean>;
}

export interface IConnectionManager
extends EventEmitter<IPeersByDiscoveryEvents> {
extends EventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents> {
getPeersByDiscovery(): Promise<PeersByDiscoveryResult>;
stop(): void;
}
2 changes: 2 additions & 0 deletions packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export interface Waku {
stop(): Promise<void>;

isStarted(): boolean;

isConnected(): boolean;
}

export interface LightNode extends Waku {
Expand Down
218 changes: 216 additions & 2 deletions packages/tests/tests/connection_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@ import type { PeerId } from "@libp2p/interface/peer-id";
import type { PeerInfo } from "@libp2p/interface/peer-info";
import { CustomEvent } from "@libp2p/interfaces/events";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { EPeersByDiscoveryEvents, LightNode, Tags } from "@waku/interfaces";
import { Multiaddr } from "@multiformats/multiaddr";
import {
EConnectionStateEvents,
EPeersByDiscoveryEvents,
LightNode,
Protocols,
Tags
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { expect } from "chai";
import sinon, { SinonSpy, SinonStub } from "sinon";

import { delay } from "../dist/delay.js";
import { tearDownNodes } from "../src/index.js";
import { makeLogFileName, NimGoNode, tearDownNodes } from "../src/index.js";

const TEST_TIMEOUT = 10_000;
const DELAY_MS = 1_000;

describe("ConnectionManager", function () {
this.timeout(20_000);
let waku: LightNode;

beforeEach(async function () {
Expand Down Expand Up @@ -156,6 +164,105 @@ describe("ConnectionManager", function () {
expect(await peerConnectedPeerExchange).to.eq(true);
});
});

describe("peer:disconnect", () => {
it("should emit `waku:offline` event when all peers disconnect", async function () {
const peerIdPx = await createSecp256k1PeerId();
const peerIdPx2 = await createSecp256k1PeerId();

await waku.libp2p.peerStore.save(peerIdPx, {
tags: {
[Tags.PEER_EXCHANGE]: {
value: 50,
ttl: 1200000
}
}
});

await waku.libp2p.peerStore.save(peerIdPx2, {
tags: {
[Tags.PEER_EXCHANGE]: {
value: 50,
ttl: 1200000
}
}
});

waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
);
waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx2 })
);

await delay(100);

let eventCount = 0;
const connectionStatus = new Promise<boolean>((resolve) => {
waku.connectionManager.addEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
({ detail: status }) => {
eventCount++;
resolve(status);
}
);
});

expect(waku.isConnected()).to.be.true;

waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx })
);
waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx2 })
);

expect(await connectionStatus).to.eq(false);
expect(eventCount).to.be.eq(1);
});
it("isConnected should return false after all peers disconnect", async function () {
const peerIdPx = await createSecp256k1PeerId();
const peerIdPx2 = await createSecp256k1PeerId();

await waku.libp2p.peerStore.save(peerIdPx, {
tags: {
[Tags.PEER_EXCHANGE]: {
value: 50,
ttl: 1200000
}
}
});

await waku.libp2p.peerStore.save(peerIdPx2, {
tags: {
[Tags.PEER_EXCHANGE]: {
value: 50,
ttl: 1200000
}
}
});

waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
);
waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx2 })
);

await delay(100);

expect(waku.isConnected()).to.be.true;

waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx })
);
waku.libp2p.dispatchEvent(
new CustomEvent<PeerId>("peer:disconnect", { detail: peerIdPx2 })
);

expect(waku.isConnected()).to.be.false;
});
});
});

describe("Dials", () => {
Expand Down Expand Up @@ -376,4 +483,111 @@ describe("ConnectionManager", function () {
});
});
});

describe("Connection state", () => {
this.timeout(20_000);
let nwaku1: NimGoNode;
let nwaku2: NimGoNode;
let nwaku1PeerId: Multiaddr;
let nwaku2PeerId: Multiaddr;

beforeEach(async () => {
this.timeout(20_000);
nwaku1 = new NimGoNode(makeLogFileName(this.ctx) + "1");
nwaku2 = new NimGoNode(makeLogFileName(this.ctx) + "2");
await nwaku1.start({
filter: true
});

await nwaku2.start({
filter: true
});

nwaku1PeerId = await nwaku1.getMultiaddrWithId();
nwaku2PeerId = await nwaku2.getMultiaddrWithId();
});

afterEach(async () => {
this.timeout(15000);
await tearDownNodes([nwaku1, nwaku2], []);
});

it("should emit `waku:online` event only when first peer is connected", async function () {
this.timeout(20_000);

let eventCount = 0;
const connectionStatus = new Promise<boolean>((resolve) => {
waku.connectionManager.addEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
({ detail: status }) => {
eventCount++;
resolve(status);
}
);
});

// await waku.start();
await waku.dial(nwaku1PeerId, [Protocols.Filter]);
await waku.dial(nwaku2PeerId, [Protocols.Filter]);

await delay(250);

expect(await connectionStatus).to.eq(true);
expect(eventCount).to.be.eq(1);
});

it("isConnected should return true after first peer connects", async function () {
this.timeout(20_000);
expect(waku.isConnected()).to.be.false;

// await waku.start();
await waku.dial(nwaku1PeerId, [Protocols.Filter]);
await waku.dial(nwaku2PeerId, [Protocols.Filter]);

await delay(250);

expect(waku.isConnected()).to.be.true;
});

it("should emit `waku:offline` event only when all peers disconnect", async function () {
this.timeout(20_000);
expect(waku.isConnected()).to.be.false;

await waku.dial(nwaku1PeerId, [Protocols.Filter]);
await waku.dial(nwaku2PeerId, [Protocols.Filter]);

await delay(250);

let eventCount = 0;
const connectionStatus = new Promise<boolean>((resolve) => {
waku.connectionManager.addEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
({ detail: status }) => {
eventCount++;
resolve(status);
}
);
});

await waku.libp2p.hangUp(nwaku1PeerId);
await waku.libp2p.hangUp(nwaku2PeerId);
expect(await connectionStatus).to.eq(false);
expect(eventCount).to.be.eq(1);
});

it("isConnected should return false after all peers disconnect", async function () {
this.timeout(20_000);
expect(waku.isConnected()).to.be.false;

await waku.dial(nwaku1PeerId, [Protocols.Filter]);
await waku.dial(nwaku2PeerId, [Protocols.Filter]);

await delay(250);
expect(waku.isConnected()).to.be.true;

await waku.libp2p.hangUp(nwaku1PeerId);
await waku.libp2p.hangUp(nwaku2PeerId);
expect(waku.isConnected()).to.be.false;
});
});
});