-
Notifications
You must be signed in to change notification settings - Fork 327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(p2p): request response node sampling #11330
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
c3db7b4
feat: reorganise reqresp handlers
Maddiaa0 f15c74f
fmt
Maddiaa0 b2aac3d
fix: make peer scoring a dep of peer manager, to avoid circular depen…
Maddiaa0 694663f
feat: send goodbye messages to peers on shutdown
Maddiaa0 5b3476b
fix: mock
Maddiaa0 a332d7e
fix test
Maddiaa0 b884f81
feat(p2p): request response node sampling
Maddiaa0 0793568
fix: test
Maddiaa0 792afe6
fix: delete empty file
Maddiaa0 686e552
fix(review): remove large cast
Maddiaa0 5cd25a2
fix: use running promise
Maddiaa0 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
170 changes: 170 additions & 0 deletions
170
yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
import { sleep } from '@aztec/foundation/sleep'; | ||
|
||
import { beforeEach, describe, expect, it, jest } from '@jest/globals'; | ||
import { type PeerId, type Stream } from '@libp2p/interface'; | ||
import { createSecp256k1PeerId } from '@libp2p/peer-id-factory'; | ||
import { type MockProxy, mock } from 'jest-mock-extended'; | ||
|
||
import { ConnectionSampler, type RandomSampler } from './connection_sampler.js'; | ||
|
||
describe('ConnectionSampler', () => { | ||
let sampler: ConnectionSampler; | ||
let mockLibp2p: any; | ||
let peers: PeerId[]; | ||
let mockRandomSampler: MockProxy<RandomSampler>; | ||
|
||
beforeEach(async () => { | ||
// Create some test peer IDs | ||
peers = [await createSecp256k1PeerId(), await createSecp256k1PeerId(), await createSecp256k1PeerId()]; | ||
|
||
// Mock libp2p | ||
mockLibp2p = { | ||
getPeers: jest.fn().mockReturnValue(peers), | ||
dialProtocol: jest.fn(), | ||
}; | ||
|
||
mockRandomSampler = mock<RandomSampler>(); | ||
mockRandomSampler.random.mockReturnValue(0); | ||
|
||
sampler = new ConnectionSampler(mockLibp2p, 500, mockRandomSampler); | ||
}); | ||
|
||
afterEach(async () => { | ||
await sampler.stop(); | ||
}); | ||
|
||
describe('getPeer', () => { | ||
it('returns a random peer from the list', () => { | ||
const peer = sampler.getPeer(); | ||
expect(peers).toContain(peer); | ||
}); | ||
|
||
it('attempts to find peer with no active connections', async () => { | ||
// Setup: Create active connection to first two peers | ||
const mockStream1: Partial<Stream> = { id: '1', close: jest.fn() } as Partial<Stream>; | ||
const mockStream2: Partial<Stream> = { id: '2', close: jest.fn() } as Partial<Stream>; | ||
|
||
mockLibp2p.dialProtocol.mockResolvedValueOnce(mockStream1).mockResolvedValueOnce(mockStream2); | ||
|
||
await sampler.dialProtocol(peers[0], 'test'); | ||
await sampler.dialProtocol(peers[1], 'test'); | ||
|
||
// Force Math.random to return values that would select the first two peers | ||
mockRandomSampler.random.mockReturnValueOnce(0).mockReturnValueOnce(1).mockReturnValueOnce(2); | ||
|
||
const selectedPeer = sampler.getPeer(); | ||
// Should select peers[2] as it has no active connections | ||
expect(selectedPeer).toBe(peers[2]); | ||
}); | ||
}); | ||
|
||
describe('connection management', () => { | ||
it('correctly tracks active connections', async () => { | ||
const mockStream: Partial<Stream> = { | ||
id: '1', | ||
close: jest.fn().mockImplementation(() => Promise.resolve()), | ||
} as Partial<Stream>; | ||
|
||
mockLibp2p.dialProtocol.mockResolvedValue(mockStream); | ||
|
||
// Open connection | ||
const stream = await sampler.dialProtocol(peers[0], 'test'); | ||
expect(stream).toBe(mockStream); | ||
|
||
// Verify internal state | ||
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(1); | ||
expect((sampler as any).streams.has('1')).toBe(true); | ||
|
||
// Close connection | ||
await sampler.close('1'); | ||
|
||
// Verify cleanup | ||
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(0); | ||
expect((sampler as any).streams.has('1')).toBe(false); | ||
expect(mockStream.close).toHaveBeenCalled(); | ||
}); | ||
|
||
it('handles multiple connections to same peer', async () => { | ||
const mockStream1: Partial<Stream> = { | ||
id: '1', | ||
close: jest.fn(), | ||
} as Partial<Stream>; | ||
const mockStream2: Partial<Stream> = { | ||
id: '2', | ||
close: jest.fn(), | ||
} as Partial<Stream>; | ||
|
||
mockLibp2p.dialProtocol.mockResolvedValueOnce(mockStream1).mockResolvedValueOnce(mockStream2); | ||
|
||
await sampler.dialProtocol(peers[0], 'test'); | ||
await sampler.dialProtocol(peers[0], 'test'); | ||
|
||
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(2); | ||
|
||
await sampler.close('1'); | ||
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(1); | ||
|
||
await sampler.close('2'); | ||
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(0); | ||
}); | ||
|
||
it('handles errors during connection close', async () => { | ||
const mockStream: Partial<Stream> = { | ||
id: '1', | ||
close: jest.fn().mockImplementation(() => Promise.reject(new Error('Failed to close'))), | ||
} as Partial<Stream>; | ||
|
||
mockLibp2p.dialProtocol.mockResolvedValue(mockStream); | ||
|
||
await sampler.dialProtocol(peers[0], 'test'); | ||
await sampler.close('1'); | ||
|
||
// Should still clean up internal state even if close fails | ||
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(0); | ||
expect((sampler as any).streams.has('1')).toBe(false); | ||
}); | ||
}); | ||
|
||
describe('cleanup', () => { | ||
it('cleans up stale connections', async () => { | ||
const mockStream: Partial<Stream> = { | ||
id: '1', | ||
close: jest.fn(), | ||
} as Partial<Stream>; | ||
|
||
mockLibp2p.dialProtocol.mockResolvedValue(mockStream); | ||
await sampler.dialProtocol(peers[0], 'test'); | ||
|
||
// Manually set activeConnectionsCount to 0 to simulate lost accounting | ||
(sampler as any).activeConnectionsCount.set(peers[0], 0); | ||
|
||
// Trigger cleanup | ||
await sleep(600); | ||
|
||
expect(mockStream.close).toHaveBeenCalled(); | ||
expect((sampler as any).streams.has('1')).toBe(false); | ||
}); | ||
|
||
it('properly cleans up on stop', async () => { | ||
const mockStream1: Partial<Stream> = { | ||
id: '1', | ||
close: jest.fn(), | ||
} as Partial<Stream>; | ||
const mockStream2: Partial<Stream> = { | ||
id: '2', | ||
close: jest.fn(), | ||
} as Partial<Stream>; | ||
|
||
mockLibp2p.dialProtocol.mockResolvedValueOnce(mockStream1).mockResolvedValueOnce(mockStream2); | ||
|
||
await sampler.dialProtocol(peers[0], 'test'); | ||
await sampler.dialProtocol(peers[1], 'test'); | ||
|
||
await sampler.stop(); | ||
|
||
expect(mockStream1.close).toHaveBeenCalled(); | ||
expect(mockStream2.close).toHaveBeenCalled(); | ||
expect((sampler as any).streams.size).toBe(0); | ||
}); | ||
}); | ||
}); |
144 changes: 144 additions & 0 deletions
144
yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
import { createLogger } from '@aztec/foundation/log'; | ||
import { RunningPromise } from '@aztec/foundation/running-promise'; | ||
|
||
import { type Libp2p, type PeerId, type Stream } from '@libp2p/interface'; | ||
|
||
const MAX_SAMPLE_ATTEMPTS = 4; | ||
|
||
interface StreamAndPeerId { | ||
stream: Stream; | ||
peerId: PeerId; | ||
} | ||
|
||
export class RandomSampler { | ||
random(max: number) { | ||
return Math.floor(Math.random() * max); | ||
} | ||
} | ||
|
||
/** | ||
* A class that samples peers from the libp2p node and returns a peer that we don't already have a connection open to. | ||
* If we already have a connection open, we try to sample a different peer. | ||
* We do this MAX_SAMPLE_ATTEMPTS times, if we still don't find a peer we just go for it. | ||
* | ||
* @dev Close must always be called on connections, else memory leak | ||
*/ | ||
export class ConnectionSampler { | ||
private readonly logger = createLogger('p2p:reqresp:connection-sampler'); | ||
private cleanupJob?: RunningPromise; | ||
|
||
private readonly activeConnectionsCount: Map<PeerId, number> = new Map(); | ||
private readonly streams: Map<string, StreamAndPeerId> = new Map(); | ||
|
||
constructor( | ||
private readonly libp2p: Libp2p, | ||
private readonly cleanupIntervalMs: number = 60000, // Default to 1 minute | ||
|
||
// Random sampler provided so that it can be mocked | ||
private readonly sampler: RandomSampler = new RandomSampler(), | ||
) { | ||
this.cleanupJob = new RunningPromise(() => this.cleanupStaleConnections(), this.logger, this.cleanupIntervalMs); | ||
this.cleanupJob.start(); | ||
} | ||
|
||
/** | ||
* Stops the cleanup job and closes all active connections | ||
*/ | ||
async stop() { | ||
await this.cleanupJob?.stop(); | ||
|
||
// Close all active streams | ||
const closePromises = Array.from(this.streams.keys()).map(streamId => this.close(streamId)); | ||
|
||
await Promise.all(closePromises); | ||
} | ||
|
||
getPeer(): PeerId { | ||
const peers = this.libp2p.getPeers(); | ||
|
||
let randomIndex = this.sampler.random(peers.length); | ||
let attempts = 0; | ||
// If the active connections count is greater than 0, then we already have a connection open | ||
// So we try to sample a different peer, but only MAX_SAMPLE_ATTEMPTS times | ||
while ((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 && attempts < MAX_SAMPLE_ATTEMPTS) { | ||
randomIndex = this.sampler.random(peers.length); | ||
attempts++; | ||
} | ||
|
||
this.logger.trace(`Sampled peer in ${attempts} attempts`, { | ||
attempts, | ||
peer: peers[randomIndex]?.toString(), | ||
}); | ||
return peers[randomIndex]; | ||
} | ||
|
||
// Set of passthrough functions to keep track of active connections | ||
|
||
/** | ||
* Dials a protocol and returns the stream | ||
* | ||
* @param peerId - The peer id | ||
* @param protocol - The protocol | ||
* @returns The stream | ||
*/ | ||
async dialProtocol(peerId: PeerId, protocol: string): Promise<Stream> { | ||
const stream = await this.libp2p.dialProtocol(peerId, protocol); | ||
this.streams.set(stream.id, { stream, peerId }); | ||
|
||
const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 0) + 1; | ||
this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount); | ||
|
||
this.logger.trace(`Dialed protocol ${protocol} with peer ${peerId.toString()}`, { | ||
streamId: stream.id, | ||
peerId: peerId.toString(), | ||
activeConnectionsCount: updatedActiveConnectionsCount, | ||
}); | ||
return stream; | ||
} | ||
|
||
/** | ||
* Closes a stream and updates the active connections count | ||
* | ||
* @param streamId - The stream id | ||
*/ | ||
async close(streamId: string): Promise<void> { | ||
try { | ||
const { stream, peerId } = this.streams.get(streamId)!; | ||
|
||
const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 1) - 1; | ||
this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount); | ||
|
||
this.logger.trace(`Closing connection to peer ${peerId.toString()}`, { | ||
streamId, | ||
peerId: peerId.toString(), | ||
protocol: stream.protocol, | ||
activeConnectionsCount: updatedActiveConnectionsCount, | ||
}); | ||
|
||
await stream?.close(); | ||
} catch (error) { | ||
this.logger.error(`Failed to close connection to peer ${streamId}`, { error }); | ||
} finally { | ||
this.streams.delete(streamId); | ||
} | ||
} | ||
|
||
/** | ||
* Cleans up stale connections that we have lost accounting for | ||
*/ | ||
private async cleanupStaleConnections() { | ||
// Look for streams without anything in the activeConnectionsCount | ||
// If we find anything, close the stream | ||
for (const [streamId, { peerId }] of this.streams.entries()) { | ||
try { | ||
// Check if we have lost track of accounting | ||
if (this.activeConnectionsCount.get(peerId) === 0) { | ||
await this.close(streamId); | ||
this.logger.debug(`Cleaned up stale connection ${streamId} to peer ${peerId.toString()}`); | ||
} | ||
} catch (error) { | ||
this.logger.error(`Error cleaning up stale connection ${streamId}`, { error }); | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be updated after successfully closing the connection?
Maybe a failure to close the connection means it's already closed or something anyway. In which case reducing the connection count is valid either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I beleive that it is the second case, libp2p should get rid of the stream eventually