Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use the lowest latency peer for protocols #1540

Merged
merged 15 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces";
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";

import { filterPeers } from "./filterPeers.js";
import { KeepAliveManager } from "./keep_alive_manager.js";
import { StreamManager } from "./stream_manager.js";

/**
Expand Down Expand Up @@ -54,8 +55,10 @@ export class BaseProtocol implements IBaseProtocol {
}

protected async getPeer(peerId?: PeerId): Promise<Peer> {
const { peerPings } = KeepAliveManager.getInstance();
const { peer } = await selectPeerForProtocol(
this.peerStore,
peerPings,
[this.multicodec],
peerId
);
Expand Down
8 changes: 6 additions & 2 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ export class ConnectionManager
...options
};

this.keepAliveManager = new KeepAliveManager(keepAliveOptions, relay);
this.keepAliveManager = KeepAliveManager.createInstance(
this.libp2p.services.ping,
keepAliveOptions,
relay
);

this.run()
.then(() => log(`Connection Manager is now running`))
Expand Down Expand Up @@ -340,7 +344,7 @@ export class ConnectionManager
void (async () => {
const peerId = evt.detail;

this.keepAliveManager.start(peerId, this.libp2p.services.ping);
this.keepAliveManager.start(peerId);

const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
Tags.BOOTSTRAP
Expand Down
52 changes: 46 additions & 6 deletions packages/core/src/lib/keep_alive_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,59 @@ import type { KeepAliveOptions } from "@waku/interfaces";
import debug from "debug";
import type { PingService } from "libp2p/ping";

import { createEncoder } from "../index.js";
import { createEncoder } from "./message/version_0.js";

export const RelayPingContentTopic = "/relay-ping/1/ping/null";
const log = debug("waku:keep-alive");

export class KeepAliveManager {
private static instance: KeepAliveManager;

private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>>;
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>>;
private options: KeepAliveOptions;
private relay?: IRelay;
private libp2pPing: PingService;
public peerPings: Map<string, number>;

constructor(options: KeepAliveOptions, relay?: IRelay) {
private constructor(
libp2pPing: PingService,
options: KeepAliveOptions,
relay?: IRelay
) {
this.pingKeepAliveTimers = new Map();
this.relayKeepAliveTimers = new Map();
this.options = options;
this.relay = relay;
this.peerPings = new Map();
this.libp2pPing = libp2pPing;
}

public static createInstance(
libp2pPing: PingService,
options: KeepAliveOptions,
relay?: IRelay
): KeepAliveManager {
if (!KeepAliveManager.instance) {
KeepAliveManager.instance = new KeepAliveManager(
libp2pPing,
options,
relay
);
}
return KeepAliveManager.instance;
}

public static getInstance(): KeepAliveManager {
if (!KeepAliveManager.instance) {
throw new Error(
"KeepAliveManager not initialized - please use createInstance() first"
);
}
return KeepAliveManager.instance;
}

public start(peerId: PeerId, libp2pPing: PingService): void {
public start(peerId: PeerId): void {
// Just in case a timer already exist for this peer
this.stop(peerId);

Expand All @@ -33,9 +67,15 @@ export class KeepAliveManager {

if (pingPeriodSecs !== 0) {
const interval = setInterval(() => {
libp2pPing.ping(peerId).catch((e) => {
log(`Ping failed (${peerIdStr})`, e);
});
this.libp2pPing
.ping(peerId)
.then((ping) => {
log(`Ping succeeded (${peerIdStr})`, ping);
this.peerPings.set(peerIdStr, ping);
})
.catch((e) => {
log(`Ping failed (${peerIdStr})`, e);
});
}, pingPeriodSecs * 1000);
this.pingKeepAliveTimers.set(peerIdStr, interval);
}
Expand Down
7 changes: 4 additions & 3 deletions packages/tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"@waku/interfaces": "*",
"@waku/utils": "*",
"app-root-path": "^3.1.0",
"chai-as-promised": "^7.1.1",
"debug": "^4.3.4",
"dockerode": "^3.3.5",
"p-timeout": "^6.1.0",
Expand All @@ -66,20 +67,20 @@
},
"devDependencies": {
"@libp2p/bootstrap": "^9.0.2",
"@types/sinon": "^10.0.16",
"@types/chai": "^4.3.5",
"@types/dockerode": "^3.3.19",
"@types/mocha": "^10.0.1",
"@types/sinon": "^10.0.16",
"@types/tail": "^2.2.1",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^6.6.0",
"@waku/sdk": "*",
"@waku/dns-discovery": "*",
"@waku/message-encryption": "*",
"@waku/peer-exchange": "*",
"@waku/sdk": "*",
"chai": "^4.3.7",
"datastore-core": "^9.2.2",
"cspell": "^7.3.2",
"datastore-core": "^9.2.2",
"debug": "^4.3.4",
"interface-datastore": "^8.2.3",
"libp2p": "^0.46.8",
Expand Down
134 changes: 132 additions & 2 deletions packages/tests/tests/utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import type { PeerStore } from "@libp2p/interface/peer-store";
import type { Peer } from "@libp2p/interface/peer-store";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import {
createDecoder,
createEncoder,
Expand All @@ -9,11 +12,16 @@ import { Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { toAsyncIterator } from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { selectPeerForProtocol } from "@waku/utils/libp2p";
import chai, { expect } from "chai";
import chaiAsPromised from "chai-as-promised";
import sinon from "sinon";

import { makeLogFileName, NOISE_KEY_1 } from "../src/index.js";
import { delay, makeLogFileName, NOISE_KEY_1 } from "../src/index.js";
import { NimGoNode } from "../src/node/node.js";

chai.use(chaiAsPromised);

const TestContentTopic = "/test/1/waku-filter";
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
const TestDecoder = createDecoder(TestContentTopic);
Expand Down Expand Up @@ -106,3 +114,125 @@ describe("Util: toAsyncIterator: Filter", () => {
expect(result.done).to.eq(true);
});
});

const TestCodec = "test/1";

describe("selectPeerForProtocol", () => {
let peerStore: PeerStore;
let peerPings: Map<string, number>;
const protocols = [TestCodec];

beforeEach(async function () {
this.timeout(10000);
const waku = await createLightNode();
await waku.start();
await delay(3000);
peerStore = waku.libp2p.peerStore;
peerPings = new Map();
});

afterEach(() => {
sinon.restore();
});

it("should return the peer with the lowest ping", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();

const mockPeers = [
{ id: peer1, protocols: [TestCodec] },
{ id: peer2, protocols: [TestCodec] },
{ id: peer3, protocols: [TestCodec] }
] as Peer[];

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

peerPings.set(peer1.toString(), 500);
peerPings.set(peer2.toString(), 1000);
peerPings.set(peer3.toString(), 100);

const result = await selectPeerForProtocol(peerStore, peerPings, protocols);

expect(result.peer).to.deep.equal(mockPeers[2]);
expect(result.protocol).to.equal(TestCodec);
});

it("should return the peer with the provided peerId", async function () {
const targetPeer = await createSecp256k1PeerId();
const mockPeer = { id: targetPeer, protocols: [TestCodec] } as Peer;
sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer);

const result = await selectPeerForProtocol(
peerStore,
peerPings,
protocols,
targetPeer
);
expect(result.peer).to.deep.equal(mockPeer);
});

it("should return a random peer when all peers have the same latency", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();

const mockPeers = [
{ id: peer1, protocols: [TestCodec] },
{ id: peer2, protocols: [TestCodec] },
{ id: peer3, protocols: [TestCodec] }
] as Peer[];

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

peerPings.set(peer1.toString(), 500);
peerPings.set(peer2.toString(), 500);
peerPings.set(peer3.toString(), 500);

const result = await selectPeerForProtocol(peerStore, peerPings, protocols);

expect(mockPeers).to.deep.include(result.peer);
});

it("should throw an error when no peer matches the given protocols", async function () {
const mockPeers = [
{ id: await createSecp256k1PeerId(), protocols: ["DifferentCodec"] },
{
id: await createSecp256k1PeerId(),
protocols: ["AnotherDifferentCodec"]
}
] as Peer[];

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

await expect(
selectPeerForProtocol(peerStore, peerPings, protocols)
).to.be.rejectedWith(
`Failed to find known peer that registers protocols: ${protocols}`
);
});

it("should throw an error when the selected peer does not register the required protocols", async function () {
const targetPeer = await createSecp256k1PeerId();
const mockPeer = { id: targetPeer, protocols: ["DifferentCodec"] } as Peer;
sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer);

await expect(
selectPeerForProtocol(peerStore, peerPings, protocols, targetPeer)
).to.be.rejectedWith(
`Peer does not register required protocols (${targetPeer.toString()}): ${protocols}`
);
});
});
Loading