From c3db7b40569270401bac1292e604a0c0bc34c2a0 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 20 Dec 2024 18:02:17 +0000 Subject: [PATCH 01/11] feat: reorganise reqresp handlers --- yarn-project/p2p/src/services/reqresp/protocols/tx.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts index 415cf4293c6..feaf3dda852 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts @@ -1,4 +1,4 @@ -import { type P2PClientType } from '@aztec/circuit-types'; +import { P2PClientType } from '@aztec/circuit-types'; import { TxHash } from '@aztec/circuit-types/tx_hash'; import { type PeerId } from '@libp2p/interface'; From f15c74f8b61783753c9d412d94140d667d107124 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 20 Dec 2024 20:59:00 +0000 Subject: [PATCH 02/11] fmt --- yarn-project/p2p/src/services/reqresp/protocols/tx.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts index feaf3dda852..415cf4293c6 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts @@ -1,4 +1,4 @@ -import { P2PClientType } from '@aztec/circuit-types'; +import { type P2PClientType } from '@aztec/circuit-types'; import { TxHash } from '@aztec/circuit-types/tx_hash'; import { type PeerId } from '@libp2p/interface'; From b2aac3d173888279b8f7392b9be71b0ef51069e5 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 20 Dec 2024 21:33:27 +0000 Subject: [PATCH 03/11] fix: make peer scoring a dep of peer manager, to avoid circular dependency with reqresp --- yarn-project/p2p/src/services/peer-manager/peer_scoring.ts | 2 +- .../p2p/src/services/reqresp/protocols/goodbye_protocol.ts | 0 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 yarn-project/p2p/src/services/reqresp/protocols/goodbye_protocol.ts diff --git a/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts b/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts index ffc1b65501f..047edfc1f9a 100644 --- a/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts @@ -90,7 +90,7 @@ export class PeerScoring { public getScoreState(peerId: string): PeerScoreState { // TODO(#11329): permanently store banned peers? const score = this.getScore(peerId); - if (score < MIN_SCORE_BEFORE_BAN) { + if (score <= MIN_SCORE_BEFORE_BAN) { return PeerScoreState.Banned; } if (score < MIN_SCORE_BEFORE_DISCONNECT) { diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye_protocol.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye_protocol.ts new file mode 100644 index 00000000000..e69de29bb2d From 694663fc1cccb9e49a77f3b4b962520986ad137a Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 20 Dec 2024 22:18:08 +0000 Subject: [PATCH 04/11] feat: send goodbye messages to peers on shutdown --- .../p2p/src/services/peer-manager/peer_manager.test.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts b/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts index a39a9031950..fff4ae2d6c2 100644 --- a/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts @@ -331,6 +331,12 @@ describe('PeerManager', () => { Buffer.from([GoodByeReason.DISCONNECTED]), ); + expect(mockLibP2PNode.hangUp).toHaveBeenCalledWith(disconnectPeerId); + expect(mockReqResp.sendRequestToPeer).toHaveBeenCalledWith( + disconnectPeerId, + ReqRespSubProtocol.GOODBYE, + Buffer.from([GoodByeReason.DISCONNECTED]), + ); // Verify that hangUp was not called for the healthy peer expect(mockLibP2PNode.hangUp).not.toHaveBeenCalledWith(healthyPeerId); From 5b3476ba8598765b96d815d41cb69e9e9f5330b9 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 20 Dec 2024 22:20:11 +0000 Subject: [PATCH 05/11] fix: mock --- yarn-project/p2p/src/services/peer-manager/peer_scoring.ts | 2 +- .../p2p/src/services/reqresp/protocols/goodbye_protocol.ts | 0 2 files changed, 1 insertion(+), 1 deletion(-) delete mode 100644 yarn-project/p2p/src/services/reqresp/protocols/goodbye_protocol.ts diff --git a/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts b/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts index 047edfc1f9a..ffc1b65501f 100644 --- a/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts @@ -90,7 +90,7 @@ export class PeerScoring { public getScoreState(peerId: string): PeerScoreState { // TODO(#11329): permanently store banned peers? const score = this.getScore(peerId); - if (score <= MIN_SCORE_BEFORE_BAN) { + if (score < MIN_SCORE_BEFORE_BAN) { return PeerScoreState.Banned; } if (score < MIN_SCORE_BEFORE_DISCONNECT) { diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye_protocol.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye_protocol.ts deleted file mode 100644 index e69de29bb2d..00000000000 From a332d7e35bc21b2d19b7c2d8f393500c38b2af0f Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sat, 18 Jan 2025 02:02:03 +0000 Subject: [PATCH 06/11] fix test --- .../p2p/src/services/peer-manager/peer_manager.test.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts b/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts index fff4ae2d6c2..a39a9031950 100644 --- a/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts @@ -331,12 +331,6 @@ describe('PeerManager', () => { Buffer.from([GoodByeReason.DISCONNECTED]), ); - expect(mockLibP2PNode.hangUp).toHaveBeenCalledWith(disconnectPeerId); - expect(mockReqResp.sendRequestToPeer).toHaveBeenCalledWith( - disconnectPeerId, - ReqRespSubProtocol.GOODBYE, - Buffer.from([GoodByeReason.DISCONNECTED]), - ); // Verify that hangUp was not called for the healthy peer expect(mockLibP2PNode.hangUp).not.toHaveBeenCalledWith(healthyPeerId); From b884f8197cea1b41dcd125b608bc1271832653a9 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sat, 18 Jan 2025 20:06:48 +0000 Subject: [PATCH 07/11] feat(p2p): request response node sampling --- .../p2p/src/services/libp2p/libp2p_service.ts | 8 +- .../connection-manager/connection_manager.ts | 0 .../connection_sampler.test.ts | 170 ++++++++++++++++++ .../connection-sampler/connection_sampler.ts | 149 +++++++++++++++ .../p2p/src/services/reqresp/reqresp.test.ts | 32 +--- .../p2p/src/services/reqresp/reqresp.ts | 27 ++- yarn-project/p2p/src/util.ts | 4 + 7 files changed, 356 insertions(+), 34 deletions(-) create mode 100644 yarn-project/p2p/src/services/reqresp/connection-manager/connection_manager.ts create mode 100644 yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts create mode 100644 yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index 7394469bb28..7b7e23381c4 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -38,6 +38,7 @@ import { noise } from '@chainsafe/libp2p-noise'; import { yamux } from '@chainsafe/libp2p-yamux'; import { identify } from '@libp2p/identify'; import { type Message, type PeerId, TopicValidatorResult } from '@libp2p/interface'; +import { type ConnectionManager } from '@libp2p/interface-internal'; import '@libp2p/kad-dht'; import { mplex } from '@libp2p/mplex'; import { tcp } from '@libp2p/tcp'; @@ -178,7 +179,7 @@ export class LibP2PService extends WithTracer implement const otelMetricsAdapter = new OtelMetricsAdapter(telemetry); - const node = await createLibp2p({ + const node = (await createLibp2p({ start: false, peerId, addresses: { @@ -249,8 +250,11 @@ export class LibP2PService extends WithTracer implement }, }), }) as (components: GossipSubComponents) => GossipSub, + components: (components: { connectionManager: ConnectionManager }) => ({ + connectionManager: components.connectionManager, + }), }, - }); + })) as PubSubLibp2p; return new LibP2PService( clientType, diff --git a/yarn-project/p2p/src/services/reqresp/connection-manager/connection_manager.ts b/yarn-project/p2p/src/services/reqresp/connection-manager/connection_manager.ts new file mode 100644 index 00000000000..e69de29bb2d diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts new file mode 100644 index 00000000000..04a975e1d05 --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts @@ -0,0 +1,170 @@ +import { sleep } from '@aztec/foundation/sleep'; + +import { beforeEach, describe, expect, it, jest } from '@jest/globals'; +import { type PeerId, type Stream } from '@libp2p/interface'; +import { createSecp256k1PeerId } from '@libp2p/peer-id-factory'; +import { type MockProxy, mock } from 'jest-mock-extended'; + +import { ConnectionSampler, type RandomSampler } from './connection_sampler.js'; + +describe('ConnectionSampler', () => { + let sampler: ConnectionSampler; + let mockLibp2p: any; + let peers: PeerId[]; + let mockRandomSampler: MockProxy; + + beforeEach(async () => { + // Create some test peer IDs + peers = [await createSecp256k1PeerId(), await createSecp256k1PeerId(), await createSecp256k1PeerId()]; + + // Mock libp2p + mockLibp2p = { + getPeers: jest.fn().mockReturnValue(peers), + dialProtocol: jest.fn(), + }; + + mockRandomSampler = mock(); + mockRandomSampler.random.mockReturnValue(0); + + sampler = new ConnectionSampler(mockLibp2p, 500, mockRandomSampler); + }); + + afterEach(async () => { + await sampler.stop(); + }); + + describe('getPeer', () => { + it('returns a random peer from the list', () => { + const peer = sampler.getPeer(); + expect(peers).toContain(peer); + }); + + it('attempts to find peer with no active connections', async () => { + // Setup: Create active connection to first two peers + const mockStream1: Partial = { id: '1', close: jest.fn() } as Partial; + const mockStream2: Partial = { id: '2', close: jest.fn() } as Partial; + + mockLibp2p.dialProtocol.mockResolvedValueOnce(mockStream1).mockResolvedValueOnce(mockStream2); + + await sampler.dialProtocol(peers[0], 'test'); + await sampler.dialProtocol(peers[1], 'test'); + + // Force Math.random to return values that would select the first two peers + mockRandomSampler.random.mockReturnValueOnce(0).mockReturnValueOnce(1).mockReturnValueOnce(2); + + const selectedPeer = sampler.getPeer(); + // Should select peers[2] as it has no active connections + expect(selectedPeer).toBe(peers[2]); + }); + }); + + describe('connection management', () => { + it('correctly tracks active connections', async () => { + const mockStream: Partial = { + id: '1', + close: jest.fn().mockImplementation(() => Promise.resolve()), + } as Partial; + + mockLibp2p.dialProtocol.mockResolvedValue(mockStream); + + // Open connection + const stream = await sampler.dialProtocol(peers[0], 'test'); + expect(stream).toBe(mockStream); + + // Verify internal state + expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(1); + expect((sampler as any).streams.has('1')).toBe(true); + + // Close connection + await sampler.close('1'); + + // Verify cleanup + expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(0); + expect((sampler as any).streams.has('1')).toBe(false); + expect(mockStream.close).toHaveBeenCalled(); + }); + + it('handles multiple connections to same peer', async () => { + const mockStream1: Partial = { + id: '1', + close: jest.fn(), + } as Partial; + const mockStream2: Partial = { + id: '2', + close: jest.fn(), + } as Partial; + + mockLibp2p.dialProtocol.mockResolvedValueOnce(mockStream1).mockResolvedValueOnce(mockStream2); + + await sampler.dialProtocol(peers[0], 'test'); + await sampler.dialProtocol(peers[0], 'test'); + + expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(2); + + await sampler.close('1'); + expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(1); + + await sampler.close('2'); + expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(0); + }); + + it('handles errors during connection close', async () => { + const mockStream: Partial = { + id: '1', + close: jest.fn().mockImplementation(() => Promise.reject(new Error('Failed to close'))), + } as Partial; + + mockLibp2p.dialProtocol.mockResolvedValue(mockStream); + + await sampler.dialProtocol(peers[0], 'test'); + await sampler.close('1'); + + // Should still clean up internal state even if close fails + expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(0); + expect((sampler as any).streams.has('1')).toBe(false); + }); + }); + + describe('cleanup', () => { + it('cleans up stale connections', async () => { + const mockStream: Partial = { + id: '1', + close: jest.fn(), + } as Partial; + + mockLibp2p.dialProtocol.mockResolvedValue(mockStream); + await sampler.dialProtocol(peers[0], 'test'); + + // Manually set activeConnectionsCount to 0 to simulate lost accounting + (sampler as any).activeConnectionsCount.set(peers[0], 0); + + // Trigger cleanup + await sleep(600); + + expect(mockStream.close).toHaveBeenCalled(); + expect((sampler as any).streams.has('1')).toBe(false); + }); + + it('properly cleans up on stop', async () => { + const mockStream1: Partial = { + id: '1', + close: jest.fn(), + } as Partial; + const mockStream2: Partial = { + id: '2', + close: jest.fn(), + } as Partial; + + mockLibp2p.dialProtocol.mockResolvedValueOnce(mockStream1).mockResolvedValueOnce(mockStream2); + + await sampler.dialProtocol(peers[0], 'test'); + await sampler.dialProtocol(peers[1], 'test'); + + await sampler.stop(); + + expect(mockStream1.close).toHaveBeenCalled(); + expect(mockStream2.close).toHaveBeenCalled(); + expect((sampler as any).streams.size).toBe(0); + }); + }); +}); diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts new file mode 100644 index 00000000000..574e5be55cc --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -0,0 +1,149 @@ +import { createLogger } from '@aztec/foundation/log'; + +import { type Libp2p, type PeerId, type Stream } from '@libp2p/interface'; + +const MAX_SAMPLE_ATTEMPTS = 4; + +interface StreamAndPeerId { + stream: Stream; + peerId: PeerId; +} + +export class RandomSampler { + random(max: number) { + return Math.floor(Math.random() * max); + } +} + +/** + * A class that samples peers from the libp2p node and returns a peer that we don't already have a connection open to. + * If we already have a connection open, we try to sample a different peer. + * We do this MAX_SAMPLE_ATTEMPTS times, if we still don't find a peer we just go for it. + * + * @dev Close must always be called on connections, else memory leak + */ +export class ConnectionSampler { + private readonly logger = createLogger('p2p:reqresp:connection-sampler'); + private readonly activeConnectionsCount: Map = new Map(); + private readonly streams: Map = new Map(); + private cleanupInterval?: NodeJS.Timeout; + + constructor( + private readonly libp2p: Libp2p, + private readonly cleanupIntervalMs: number = 60000, // Default to 1 minute + + // Random sampler provided so that it can be mocked + private readonly sampler: RandomSampler = new RandomSampler(), + ) { + this.startCleanupJob(); + } + + private startCleanupJob() { + this.cleanupInterval = setInterval(() => { + void this.cleanupStaleConnections(); + }, this.cleanupIntervalMs); + } + + /** + * Stops the cleanup job and closes all active connections + */ + async stop() { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + } + + // Close all active streams + const closePromises = Array.from(this.streams.keys()).map(streamId => this.close(streamId)); + + await Promise.all(closePromises); + } + + getPeer(): PeerId { + const peers = this.libp2p.getPeers(); + + let randomIndex = this.sampler.random(peers.length); + let attempts = 0; + // If the active connections count is greater than 0, then we already have a connection open + // So we try to sample a different peer, but only MAX_SAMPLE_ATTEMPTS times + while ((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 && attempts < MAX_SAMPLE_ATTEMPTS) { + randomIndex = this.sampler.random(peers.length); + attempts++; + } + + this.logger.trace(`Sampled peer in ${attempts} attempts`, { + attempts, + peer: peers[randomIndex]?.toString(), + }); + return peers[randomIndex]; + } + + // Set of passthrough functions to keep track of active connections + + /** + * Dials a protocol and returns the stream + * + * @param peerId - The peer id + * @param protocol - The protocol + * @returns The stream + */ + async dialProtocol(peerId: PeerId, protocol: string): Promise { + const stream = await this.libp2p.dialProtocol(peerId, protocol); + this.streams.set(stream.id, { stream, peerId }); + + const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 0) + 1; + this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount); + + this.logger.trace(`Dialed protocol ${protocol} with peer ${peerId.toString()}`, { + streamId: stream.id, + peerId: peerId.toString(), + activeConnectionsCount: updatedActiveConnectionsCount, + }); + return stream; + } + + /** + * Closes a stream and updates the active connections count + * + * @param streamId - The stream id + */ + async close(streamId: string): Promise { + try { + const { stream, peerId } = this.streams.get(streamId)!; + + const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 1) - 1; + this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount); + + this.logger.trace(`Closing connection to peer ${peerId.toString()}`, { + streamId, + peerId: peerId.toString(), + protocol: stream.protocol, + activeConnectionsCount: updatedActiveConnectionsCount, + }); + + await stream?.close(); + } catch (error) { + this.logger.error(`Failed to close connection to peer ${streamId}`, { error }); + } finally { + this.streams.delete(streamId); + } + } + + /** + * Cleans up stale connections that we have lost accounting for + */ + private async cleanupStaleConnections() { + // Look for streams without anything in the activeConnectionsCount + // If we find anything, close the stream + for (const [streamId, { peerId }] of this.streams.entries()) { + try { + // Check if we have lost track of accounting + if (this.activeConnectionsCount.get(peerId) === 0) { + await this.close(streamId); + this.logger.debug(`Cleaned up stale connection ${streamId} to peer ${peerId.toString()}`); + } + } catch (error) { + this.logger.error(`Error cleaning up stale connection ${streamId}`, { error }); + } + } + } +} diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index c5b0fd43e5b..87d580cf09a 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -40,7 +40,7 @@ describe('ReqResp', () => { } }); - it('Should perform a ping request', async () => { + it('should perform a ping request', async () => { // Create two nodes // They need to discover each other nodes = await createNodes(peerScoring, 2); @@ -59,7 +59,7 @@ describe('ReqResp', () => { expect(res?.toBuffer().toString('utf-8')).toEqual('pong'); }); - it('Should handle gracefully if a peer connected peer is offline', async () => { + it('should handle gracefully if a peer connected peer is offline', async () => { nodes = await createNodes(peerScoring, 2); const { req: pinger } = nodes[0]; @@ -78,7 +78,7 @@ describe('ReqResp', () => { expect(res).toBeUndefined(); }); - it('Should request from a later peer if other peers are offline', async () => { + it('should request from a later peer if other peers are offline', async () => { nodes = await createNodes(peerScoring, 4); await startNodes(nodes); @@ -90,31 +90,15 @@ describe('ReqResp', () => { void nodes[1].req.stop(); void nodes[2].req.stop(); - const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug'); - // send from the first node const res = await nodes[0].req.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST); - // We expect the logger to have been called twice with the peer ids citing the inability to connect - expect(loggerSpy).toHaveBeenCalledWith( - expect.stringContaining(`Connection reset: ${nodes[1].p2p.peerId.toString()}`), - { - peerId: nodes[1].p2p.peerId.toString(), - subProtocol: ReqRespSubProtocol.PING, - }, - ); - expect(loggerSpy).toHaveBeenCalledWith( - expect.stringContaining(`Connection reset: ${nodes[2].p2p.peerId.toString()}`), - { - peerId: nodes[2].p2p.peerId.toString(), - subProtocol: ReqRespSubProtocol.PING, - }, - ); + // It will randomly try to connect, then hit the correct node expect(res?.toBuffer().toString('utf-8')).toEqual('pong'); }); - it('Should hit a rate limit if too many requests are made in quick succession', async () => { + it('should hit a rate limit if too many requests are made in quick succession', async () => { nodes = await createNodes(peerScoring, 2); await startNodes(nodes); @@ -207,7 +191,7 @@ describe('ReqResp', () => { expect(res).toBeUndefined(); }); - it('Should hit individual timeout if nothing is returned over the stream', async () => { + it('should hit individual timeout if nothing is returned over the stream', async () => { nodes = await createNodes(peerScoring, 2); await startNodes(nodes); @@ -248,7 +232,7 @@ describe('ReqResp', () => { ); }); - it('Should hit collective timeout if nothing is returned over the stream from multiple peers', async () => { + it('should hit collective timeout if nothing is returned over the stream from multiple peers', async () => { nodes = await createNodes(peerScoring, 4); await startNodes(nodes); @@ -275,7 +259,7 @@ describe('ReqResp', () => { expect(loggerSpy).toHaveBeenCalledWith(errorMessage); }); - it('Should penalize peer if transaction validation fails', async () => { + it('should penalize peer if transaction validation fails', async () => { const tx = mockTx(); const txHash = tx.getTxHash(); diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index a242e1357ee..604c5eaf3cd 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -16,6 +16,7 @@ import { import { SnappyTransform } from '../encoding.js'; import { type PeerScoring } from '../peer-manager/peer_scoring.js'; import { type P2PReqRespConfig } from './config.js'; +import { ConnectionSampler } from './connection-sampler/connection_sampler.js'; import { DEFAULT_SUB_PROTOCOL_HANDLERS, DEFAULT_SUB_PROTOCOL_VALIDATORS, @@ -55,6 +56,8 @@ export class ReqResp { private snappyTransform: SnappyTransform; + private connectionSampler: ConnectionSampler; + constructor(config: P2PReqRespConfig, private libp2p: Libp2p, private peerScoring: PeerScoring) { this.logger = createLogger('p2p:reqresp'); @@ -62,6 +65,10 @@ export class ReqResp { this.individualRequestTimeoutMs = config.individualRequestTimeoutMs; this.rateLimiter = new RequestResponseRateLimiter(peerScoring); + + // Connection sampler is used to sample our connected peers + this.connectionSampler = new ConnectionSampler(libp2p); + this.snappyTransform = new SnappyTransform(); } @@ -96,6 +103,9 @@ export class ReqResp { this.rateLimiter.stop(); this.logger.debug('ReqResp: Rate limiter stopped'); + await this.connectionSampler.stop(); + this.logger.debug('ReqResp: Connection sampler stopped'); + // NOTE: We assume libp2p instance is managed by the caller } @@ -136,11 +146,14 @@ export class ReqResp { const responseValidator = this.subProtocolValidators[subProtocol]; const requestBuffer = request.toBuffer(); - // Get active peers - const peers = this.libp2p.getPeers(); + // Attempt to ask all of our peers, but sampled in a random order + // This function is wrapped in a timeout, so we will exit the loop if we have not received a response + const numberOfPeers = this.libp2p.getPeers().length; + for (let i = 0; i < numberOfPeers; i++) { + // Sample a peer to make a request to + const peer = this.connectionSampler.getPeer(); - // Attempt to ask all of our peers - for (const peer of peers) { + this.logger.trace(`Sending request to peer: ${peer.toString()}`); const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffer); // If we get a response, return it, otherwise we iterate onto the next peer @@ -155,7 +168,6 @@ export class ReqResp { return object; } } - return undefined; }; try { @@ -201,7 +213,7 @@ export class ReqResp { ): Promise { let stream: Stream | undefined; try { - stream = await this.libp2p.dialProtocol(peerId, subProtocol); + stream = await this.connectionSampler.dialProtocol(peerId, subProtocol); this.logger.trace(`Stream opened with ${peerId.toString()} for ${subProtocol}`); // Open the stream with a timeout @@ -217,8 +229,7 @@ export class ReqResp { } finally { if (stream) { try { - await stream.close(); - this.logger.trace(`Stream closed with ${peerId.toString()} for ${subProtocol}`); + await this.connectionSampler.close(stream.id); } catch (closeError) { this.logger.error( `Error closing stream: ${closeError instanceof Error ? closeError.message : 'Unknown error'}`, diff --git a/yarn-project/p2p/src/util.ts b/yarn-project/p2p/src/util.ts index 6d3464bc583..2f88d45095a 100644 --- a/yarn-project/p2p/src/util.ts +++ b/yarn-project/p2p/src/util.ts @@ -4,6 +4,7 @@ import { type DataStoreConfig } from '@aztec/kv-store/config'; import type { GossipSub } from '@chainsafe/libp2p-gossipsub'; import { generateKeyPair, marshalPrivateKey, unmarshalPrivateKey } from '@libp2p/crypto/keys'; import { type PeerId, type PrivateKey } from '@libp2p/interface'; +import { type ConnectionManager } from '@libp2p/interface-internal'; import { createFromPrivKey } from '@libp2p/peer-id-factory'; import { resolve } from 'dns/promises'; import type { Libp2p } from 'libp2p'; @@ -13,6 +14,9 @@ import { type P2PConfig } from './config.js'; export interface PubSubLibp2p extends Libp2p { services: { pubsub: GossipSub; + components: { + connectionManager: ConnectionManager; + }; }; } From 07935681d0bde501e324328a30350edb14be29e2 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sun, 19 Jan 2025 01:11:43 +0000 Subject: [PATCH 08/11] fix: test --- yarn-project/p2p/src/services/reqresp/reqresp.test.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index 87d580cf09a..5758b7a58bd 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -91,10 +91,16 @@ describe('ReqResp', () => { void nodes[2].req.stop(); // send from the first node - const res = await nodes[0].req.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST); + let res = await nodes[0].req.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST); - // It will randomly try to connect, then hit the correct node + if (!res) { + // The peer chosen is randomly selected, and the node above wont respond, so if + // we wait and try again, there will only be one node to chose from + await sleep(500); + res = await nodes[0].req.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST); + } + // It will randomly try to connect, then hit the correct node expect(res?.toBuffer().toString('utf-8')).toEqual('pong'); }); From 792afe655068024e8455941250f5175b4abbd257 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sun, 19 Jan 2025 01:33:10 +0000 Subject: [PATCH 09/11] fix: delete empty file --- .../src/services/reqresp/connection-manager/connection_manager.ts | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 yarn-project/p2p/src/services/reqresp/connection-manager/connection_manager.ts diff --git a/yarn-project/p2p/src/services/reqresp/connection-manager/connection_manager.ts b/yarn-project/p2p/src/services/reqresp/connection-manager/connection_manager.ts deleted file mode 100644 index e69de29bb2d..00000000000 From 686e552f1d026ab9713812912101826db7bb6292 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sun, 19 Jan 2025 15:36:59 +0000 Subject: [PATCH 10/11] fix(review): remove large cast --- yarn-project/p2p/src/services/libp2p/libp2p_service.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index 7b7e23381c4..e92d8ae1620 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -179,7 +179,7 @@ export class LibP2PService extends WithTracer implement const otelMetricsAdapter = new OtelMetricsAdapter(telemetry); - const node = (await createLibp2p({ + const node = await createLibp2p({ start: false, peerId, addresses: { @@ -254,7 +254,7 @@ export class LibP2PService extends WithTracer implement connectionManager: components.connectionManager, }), }, - })) as PubSubLibp2p; + }); return new LibP2PService( clientType, From 5cd25a2a4d7505e3355e3e7f745bdccf4e23c229 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Mon, 20 Jan 2025 11:40:40 +0000 Subject: [PATCH 11/11] fix: use running promise --- .../connection-sampler/connection_sampler.ts | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts index 574e5be55cc..a44164eed09 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -1,4 +1,5 @@ import { createLogger } from '@aztec/foundation/log'; +import { RunningPromise } from '@aztec/foundation/running-promise'; import { type Libp2p, type PeerId, type Stream } from '@libp2p/interface'; @@ -24,9 +25,10 @@ export class RandomSampler { */ export class ConnectionSampler { private readonly logger = createLogger('p2p:reqresp:connection-sampler'); + private cleanupJob?: RunningPromise; + private readonly activeConnectionsCount: Map = new Map(); private readonly streams: Map = new Map(); - private cleanupInterval?: NodeJS.Timeout; constructor( private readonly libp2p: Libp2p, @@ -35,22 +37,15 @@ export class ConnectionSampler { // Random sampler provided so that it can be mocked private readonly sampler: RandomSampler = new RandomSampler(), ) { - this.startCleanupJob(); - } - - private startCleanupJob() { - this.cleanupInterval = setInterval(() => { - void this.cleanupStaleConnections(); - }, this.cleanupIntervalMs); + this.cleanupJob = new RunningPromise(() => this.cleanupStaleConnections(), this.logger, this.cleanupIntervalMs); + this.cleanupJob.start(); } /** * Stops the cleanup job and closes all active connections */ async stop() { - if (this.cleanupInterval) { - clearInterval(this.cleanupInterval); - } + await this.cleanupJob?.stop(); // Close all active streams const closePromises = Array.from(this.streams.keys()).map(streamId => this.close(streamId));