Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ConnectionManager extends EventEmitter & exposed on the Waku interface (& minor improvements) #1447

Merged
merged 5 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,4 @@ export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";

export { ConnectionManager } from "./lib/connection_manager.js";

export {
KeepAliveManager,
KeepAliveOptions,
} from "./lib/keep_alive_manager.js";
export { KeepAliveManager } from "./lib/keep_alive_manager.js";
11 changes: 8 additions & 3 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,27 @@ import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import {
ConnectionManagerOptions,
EPeersByDiscoveryEvents,
IConnectionManager,
IPeersByDiscoveryEvents,
IRelay,
KeepAliveOptions,
PeersByDiscoveryResult,
} from "@waku/interfaces";
import { Libp2p, Tags } from "@waku/interfaces";
import debug from "debug";

import { KeepAliveManager, KeepAliveOptions } from "./keep_alive_manager.js";
import { KeepAliveManager } from "./keep_alive_manager.js";

const log = debug("waku:connection-manager");

export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
export const DEFAULT_MAX_PARALLEL_DIALS = 3;

export class ConnectionManager extends EventEmitter<IPeersByDiscoveryEvents> {
export class ConnectionManager
extends EventEmitter<IPeersByDiscoveryEvents>
implements IConnectionManager
{
private static instances = new Map<string, ConnectionManager>();
private keepAliveManager: KeepAliveManager;
private options: ConnectionManagerOptions;
Expand Down Expand Up @@ -217,7 +222,7 @@ export class ConnectionManager extends EventEmitter<IPeersByDiscoveryEvents> {
}
}

async dropConnection(peerId: PeerId): Promise<void> {
private async dropConnection(peerId: PeerId): Promise<void> {
try {
this.keepAliveManager.stop(peerId);
await this.libp2p.hangUp(peerId);
Expand Down
6 changes: 1 addition & 5 deletions packages/core/src/lib/keep_alive_manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import type { IRelay } from "@waku/interfaces";
import type { KeepAliveOptions } from "@waku/interfaces";
import debug from "debug";
import type { PingService } from "libp2p/ping";

Expand All @@ -8,11 +9,6 @@ import { createEncoder } from "../index.js";
export const RelayPingContentTopic = "/relay-ping/1/ping/null";
const log = debug("waku:keep-alive");

export interface KeepAliveOptions {
pingKeepAlive: number;
relayKeepAlive: number;
}

export class KeepAliveManager {
private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>>;
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>>;
Expand Down
7 changes: 7 additions & 0 deletions packages/interfaces/src/connection_manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import type { EventEmitter } from "@libp2p/interfaces/events";

export enum Tags {
BOOTSTRAP = "bootstrap",
Expand Down Expand Up @@ -47,3 +48,9 @@ export interface PeersByDiscoveryResult {
[Tags.PEER_EXCHANGE]: Peer[];
};
}

export interface IConnectionManager
extends EventEmitter<IPeersByDiscoveryEvents> {
getPeersByDiscovery(): Promise<PeersByDiscoveryResult>;
stop(): void;
}
1 change: 1 addition & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ export * from "./sender.js";
export * from "./receiver.js";
export * from "./misc.js";
export * from "./libp2p.js";
export * from "./keep_alive_manager.js";
4 changes: 4 additions & 0 deletions packages/interfaces/src/keep_alive_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface KeepAliveOptions {
pingKeepAlive: number;
relayKeepAlive: number;
}
3 changes: 3 additions & 0 deletions packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Stream } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Multiaddr } from "@multiformats/multiaddr";

import { IConnectionManager } from "./connection_manager.js";
import type { IFilter } from "./filter.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js";
Expand All @@ -16,6 +17,8 @@ export interface Waku {
filter?: IFilter;
lightPush?: ILightPush;

connectionManager: IConnectionManager;

dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>;

start(): Promise<void>;
Expand Down
44 changes: 21 additions & 23 deletions packages/tests/tests/connection_manager.spec.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,20 @@
import { CustomEvent } from "@libp2p/interfaces/events";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { ConnectionManager, KeepAliveOptions } from "@waku/core";
import { EPeersByDiscoveryEvents, LightNode, Tags } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { expect } from "chai";
import sinon, { SinonSpy, SinonStub } from "sinon";

import { delay } from "../dist/delay.js";

const KEEP_ALIVE_OPTIONS: KeepAliveOptions = {
pingKeepAlive: 0,
relayKeepAlive: 5 * 1000,
};
const TEST_TIMEOUT = 10_000;
const DELAY_MS = 1_000;

describe("ConnectionManager", function () {
let connectionManager: ConnectionManager | undefined;
let waku: LightNode;
let peerId: string;

beforeEach(async function () {
waku = await createLightNode();
peerId = Math.random().toString(36).substring(7);
connectionManager = ConnectionManager.create(
peerId,
waku.libp2p,
KEEP_ALIVE_OPTIONS
);
});

afterEach(async () => {
Expand All @@ -51,15 +38,17 @@ describe("ConnectionManager", function () {
});

const peerDiscoveryBootstrap = new Promise<boolean>((resolve) => {
connectionManager!.addEventListener(
waku.connectionManager.addEventListener(
EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP,
({ detail: receivedPeerId }) => {
resolve(receivedPeerId.toString() === peerIdBootstrap.toString());
}
);
});

waku.libp2p.dispatchEvent(new CustomEvent("peer", { detail: peerId }));
waku.libp2p.dispatchEvent(
new CustomEvent("peer", { detail: await createSecp256k1PeerId() })
);

expect(await peerDiscoveryBootstrap).to.eq(true);
});
Expand All @@ -77,7 +66,7 @@ describe("ConnectionManager", function () {
});

const peerDiscoveryPeerExchange = new Promise<boolean>((resolve) => {
connectionManager!.addEventListener(
waku.connectionManager.addEventListener(
EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE,
({ detail: receivedPeerId }) => {
resolve(receivedPeerId.toString() === peerIdPx.toString());
Expand Down Expand Up @@ -109,7 +98,7 @@ describe("ConnectionManager", function () {
});

const peerConnectedBootstrap = new Promise<boolean>((resolve) => {
connectionManager!.addEventListener(
waku.connectionManager.addEventListener(
EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP,
({ detail: receivedPeerId }) => {
resolve(receivedPeerId.toString() === peerIdBootstrap.toString());
Expand All @@ -136,7 +125,7 @@ describe("ConnectionManager", function () {
});

const peerConnectedPeerExchange = new Promise<boolean>((resolve) => {
connectionManager!.addEventListener(
waku.connectionManager.addEventListener(
EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE,
({ detail: receivedPeerId }) => {
resolve(receivedPeerId.toString() === peerIdPx.toString());
Expand All @@ -157,16 +146,25 @@ describe("ConnectionManager", function () {
let dialPeerStub: SinonStub;
let getConnectionsStub: SinonStub;
let getTagNamesForPeerStub: SinonStub;
let waku: LightNode;

afterEach(() => {
this.beforeEach(async function () {
waku = await createLightNode();
});

afterEach(async () => {
await waku.stop();
sinon.restore();
});

describe("attemptDial method", function () {
let attemptDialSpy: SinonSpy;

beforeEach(function () {
attemptDialSpy = sinon.spy(connectionManager as any, "attemptDial");
attemptDialSpy = sinon.spy(
waku.connectionManager as any,
"attemptDial"
);
});

afterEach(function () {
Expand Down Expand Up @@ -196,14 +194,14 @@ describe("ConnectionManager", function () {
describe("dialPeer method", function () {
beforeEach(function () {
getConnectionsStub = sinon.stub(
(connectionManager as any).libp2p,
(waku.connectionManager as any).libp2p,
"getConnections"
);
getTagNamesForPeerStub = sinon.stub(
connectionManager as any,
waku.connectionManager as any,
"getTagNamesForPeer"
);
dialPeerStub = sinon.stub(connectionManager as any, "dialPeer");
dialPeerStub = sinon.stub(waku.connectionManager as any, "dialPeer");
});

afterEach(function () {
Expand Down