Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't dial discovered peers if have already been attempted dial #1657

Merged
merged 9 commits into from
Oct 20, 2023
Merged
24 changes: 18 additions & 6 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class ConnectionManager
private dialAttemptsForPeer: Map<string, number> = new Map();
private dialErrorsForPeer: Map<string, any> = new Map();

private currentActiveDialCount = 0;
private currentActiveParallelDialCount = 0;
private pendingPeerDialQueue: Array<PeerId> = [];

public static create(
Expand Down Expand Up @@ -183,7 +183,7 @@ export class ConnectionManager
}

private async dialPeer(peerId: PeerId): Promise<void> {
this.currentActiveDialCount += 1;
this.currentActiveParallelDialCount += 1;
let dialAttempt = 0;
while (dialAttempt < this.options.maxDialAttemptsForPeer) {
try {
Expand All @@ -199,7 +199,10 @@ export class ConnectionManager
conn.tags = Array.from(new Set([...conn.tags, ...tags]));
});

this.dialAttemptsForPeer.delete(peerId.toString());
// instead of deleting the peer from the peer store, we set the dial attempt to -1
// this helps us keep track of peers that have been dialed before
this.dialAttemptsForPeer.set(peerId.toString(), -1);

// Dialing succeeded, break the loop
break;
} catch (error) {
Expand All @@ -224,7 +227,7 @@ export class ConnectionManager
}

// Always decrease the active dial count and process the dial queue
this.currentActiveDialCount--;
this.currentActiveParallelDialCount--;
this.processDialQueue();

// If max dial attempts reached and dialing failed, delete the peer
Expand Down Expand Up @@ -276,7 +279,7 @@ export class ConnectionManager
private processDialQueue(): void {
if (
this.pendingPeerDialQueue.length > 0 &&
this.currentActiveDialCount < this.options.maxParallelDials
this.currentActiveParallelDialCount < this.options.maxParallelDials
) {
const peerId = this.pendingPeerDialQueue.shift();
if (!peerId) return;
Expand Down Expand Up @@ -322,7 +325,7 @@ export class ConnectionManager
private async attemptDial(peerId: PeerId): Promise<void> {
if (!(await this.shouldDialPeer(peerId))) return;

if (this.currentActiveDialCount >= this.options.maxParallelDials) {
if (this.currentActiveParallelDialCount >= this.options.maxParallelDials) {
this.pendingPeerDialQueue.push(peerId);
return;
}
Expand Down Expand Up @@ -404,6 +407,7 @@ export class ConnectionManager
* 1. If the peer is already connected, don't dial
* 2. If the peer is not part of any of the configured pubsub topics, don't dial
* 3. If the peer is not dialable based on bootstrap status, don't dial
* 4. If the peer is already has an active dial attempt, or has been dialed before, don't dial it
* @returns true if the peer should be dialed, false otherwise
*/
private async shouldDialPeer(peerId: PeerId): Promise<boolean> {
Expand Down Expand Up @@ -437,6 +441,14 @@ export class ConnectionManager
return false;
}

// If the peer is already already has an active dial attempt, or has been dialed before, don't dial it
if (this.dialAttemptsForPeer.has(peerId.toString())) {
log(
`Peer ${peerId.toString()} has already been attempted dial before, or already has a dial attempt in progress, skipping dial`
);
return false;
}

return true;
}

Expand Down
118 changes: 78 additions & 40 deletions packages/tests/tests/connection_manager.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PeerInfo } from "@libp2p/interface/peer-info";
import { CustomEvent } from "@libp2p/interfaces/events";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { EPeersByDiscoveryEvents, LightNode, Tags } from "@waku/interfaces";
Expand Down Expand Up @@ -49,7 +51,13 @@ describe("ConnectionManager", function () {
});

waku.libp2p.dispatchEvent(
new CustomEvent("peer", { detail: await createSecp256k1PeerId() })
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: peerIdBootstrap,
multiaddrs: [],
protocols: []
}
})
);

expect(await peerDiscoveryBootstrap).to.eq(true);
Expand Down Expand Up @@ -77,7 +85,13 @@ describe("ConnectionManager", function () {
});

waku.libp2p.dispatchEvent(
new CustomEvent("peer", { detail: peerIdPx })
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: peerIdPx,
multiaddrs: [],
protocols: []
}
})
);

expect(await peerDiscoveryPeerExchange).to.eq(true);
Expand Down Expand Up @@ -109,7 +123,7 @@ describe("ConnectionManager", function () {
});

waku.libp2p.dispatchEvent(
new CustomEvent("peer:connect", { detail: peerIdBootstrap })
new CustomEvent<PeerId>("peer:connect", { detail: peerIdBootstrap })
);

expect(await peerConnectedBootstrap).to.eq(true);
Expand All @@ -136,7 +150,7 @@ describe("ConnectionManager", function () {
});

waku.libp2p.dispatchEvent(
new CustomEvent("peer:connect", { detail: peerIdPx })
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
);

expect(await peerConnectedPeerExchange).to.eq(true);
Expand Down Expand Up @@ -182,27 +196,34 @@ describe("ConnectionManager", function () {
attemptDialSpy.restore();
});

it("should be called on all `peer:discovery` events", async function () {
it("should be called at least once on all `peer:discovery` events", async function () {
this.timeout(TEST_TIMEOUT);

const totalPeerIds = 5;
for (let i = 1; i <= totalPeerIds; i++) {
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: `peer-id-${i}` })
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: await createSecp256k1PeerId(),
multiaddrs: [],
protocols: []
}
})
);
}

// add delay to allow async function calls within attemptDial to finish
await delay(100);

expect(attemptDialSpy.callCount).to.equal(
expect(attemptDialSpy.callCount).to.be.greaterThanOrEqual(
totalPeerIds,
"attemptDial should be called once for each peer:discovery event"
"attemptDial should be called at least once for each peer:discovery event"
);
});
});

describe("dialPeer method", function () {
let peerStoreHasStub: SinonStub;
let dialAttemptsForPeerHasStub: SinonStub;
beforeEach(function () {
getConnectionsStub = sinon.stub(
(waku.connectionManager as any).libp2p,
Expand All @@ -213,29 +234,44 @@ describe("ConnectionManager", function () {
"getTagNamesForPeer"
);
dialPeerStub = sinon.stub(waku.connectionManager as any, "dialPeer");
peerStoreHasStub = sinon.stub(waku.libp2p.peerStore, "has");
dialAttemptsForPeerHasStub = sinon.stub(
(waku.connectionManager as any).dialAttemptsForPeer,
"has"
);

// simulate that the peer is not connected
getConnectionsStub.returns([]);

// simulate that the peer is a bootstrap peer
getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]);

// simulate that the peer is not in the peerStore
peerStoreHasStub.returns(false);

// simulate that the peer has not been dialed before
dialAttemptsForPeerHasStub.returns(false);
});

afterEach(function () {
dialPeerStub.restore();
getTagNamesForPeerStub.restore();
getConnectionsStub.restore();
peerStoreHasStub.restore();
dialAttemptsForPeerHasStub.restore();
});

describe("For bootstrap peers", function () {
it("should be called for bootstrap peers", async function () {
this.timeout(TEST_TIMEOUT);

// simulate that the peer is not connected
getConnectionsStub.returns([]);

// simulate that the peer is a bootstrap peer
getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]);

const bootstrapPeer = await createSecp256k1PeerId();

// emit a peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: bootstrapPeer })
new CustomEvent<PeerInfo>("peer:discovery", {
detail: { id: bootstrapPeer, multiaddrs: [], protocols: [] }
})
);

// wait for the async function calls within attemptDial to finish
Expand All @@ -251,15 +287,15 @@ describe("ConnectionManager", function () {
it("should not be called more than DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED times for bootstrap peers", async function () {
this.timeout(TEST_TIMEOUT);

// simulate that the peer is not connected
getConnectionsStub.returns([]);

// simulate that the peer is a bootstrap peer
getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]);

// emit first peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: "bootstrap-peer" })
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: await createSecp256k1PeerId(),
multiaddrs: [],
protocols: []
}
})
);
await delay(500);

Expand All @@ -271,8 +307,12 @@ describe("ConnectionManager", function () {
for (let i = 1; i <= totalBootstrapPeers; i++) {
await delay(500);
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", {
detail: await createSecp256k1PeerId()
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: await createSecp256k1PeerId(),
multiaddrs: [],
protocols: []
}
})
);
}
Expand All @@ -289,17 +329,17 @@ describe("ConnectionManager", function () {
it("should be called for peers with PEER_EXCHANGE tags", async function () {
this.timeout(TEST_TIMEOUT);

// simulate that the peer is not connected
getConnectionsStub.returns([]);

// simulate that the peer has a PEER_EXCHANGE tag
getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]);

const pxPeer = await createSecp256k1PeerId();

// emit a peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: pxPeer })
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: pxPeer,
multiaddrs: [],
protocols: []
}
})
);

// wait for the async function calls within attemptDial to finish
Expand All @@ -315,18 +355,16 @@ describe("ConnectionManager", function () {
it("should be called for every peer with PEER_EXCHANGE tags", async function () {
this.timeout(TEST_TIMEOUT);

// simulate that the peer is not connected
getConnectionsStub.returns([]);

// simulate that the peer has a PEER_EXCHANGE tag
getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]);

// emit multiple peer:discovery events
const totalPxPeers = 5;
for (let i = 0; i < totalPxPeers; i++) {
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", {
detail: await createSecp256k1PeerId()
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: await createSecp256k1PeerId(),
multiaddrs: [],
protocols: []
}
})
);
await delay(500);
Expand Down
Loading