Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): request response node sampling #11330

Merged
merged 11 commits into from
Jan 20, 2025
4 changes: 4 additions & 0 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
import { identify } from '@libp2p/identify';
import { type Message, type PeerId, TopicValidatorResult } from '@libp2p/interface';
import { type ConnectionManager } from '@libp2p/interface-internal';
import '@libp2p/kad-dht';
import { mplex } from '@libp2p/mplex';
import { tcp } from '@libp2p/tcp';
Expand Down Expand Up @@ -249,6 +250,9 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
},
}),
}) as (components: GossipSubComponents) => GossipSub,
components: (components: { connectionManager: ConnectionManager }) => ({
connectionManager: components.connectionManager,
}),
},
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { sleep } from '@aztec/foundation/sleep';

import { beforeEach, describe, expect, it, jest } from '@jest/globals';
import { type PeerId, type Stream } from '@libp2p/interface';
import { createSecp256k1PeerId } from '@libp2p/peer-id-factory';
import { type MockProxy, mock } from 'jest-mock-extended';

import { ConnectionSampler, type RandomSampler } from './connection_sampler.js';

describe('ConnectionSampler', () => {
let sampler: ConnectionSampler;
let mockLibp2p: any;
let peers: PeerId[];
let mockRandomSampler: MockProxy<RandomSampler>;

beforeEach(async () => {
// Create some test peer IDs
peers = [await createSecp256k1PeerId(), await createSecp256k1PeerId(), await createSecp256k1PeerId()];

// Mock libp2p
mockLibp2p = {
getPeers: jest.fn().mockReturnValue(peers),
dialProtocol: jest.fn(),
};

mockRandomSampler = mock<RandomSampler>();
mockRandomSampler.random.mockReturnValue(0);

sampler = new ConnectionSampler(mockLibp2p, 500, mockRandomSampler);
});

afterEach(async () => {
await sampler.stop();
});

describe('getPeer', () => {
it('returns a random peer from the list', () => {
const peer = sampler.getPeer();
expect(peers).toContain(peer);
});

it('attempts to find peer with no active connections', async () => {
// Setup: Create active connection to first two peers
const mockStream1: Partial<Stream> = { id: '1', close: jest.fn() } as Partial<Stream>;
const mockStream2: Partial<Stream> = { id: '2', close: jest.fn() } as Partial<Stream>;

mockLibp2p.dialProtocol.mockResolvedValueOnce(mockStream1).mockResolvedValueOnce(mockStream2);

await sampler.dialProtocol(peers[0], 'test');
await sampler.dialProtocol(peers[1], 'test');

// Force Math.random to return values that would select the first two peers
mockRandomSampler.random.mockReturnValueOnce(0).mockReturnValueOnce(1).mockReturnValueOnce(2);

const selectedPeer = sampler.getPeer();
// Should select peers[2] as it has no active connections
expect(selectedPeer).toBe(peers[2]);
});
});

describe('connection management', () => {
it('correctly tracks active connections', async () => {
const mockStream: Partial<Stream> = {
id: '1',
close: jest.fn().mockImplementation(() => Promise.resolve()),
} as Partial<Stream>;

mockLibp2p.dialProtocol.mockResolvedValue(mockStream);

// Open connection
const stream = await sampler.dialProtocol(peers[0], 'test');
expect(stream).toBe(mockStream);

// Verify internal state
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(1);
expect((sampler as any).streams.has('1')).toBe(true);

// Close connection
await sampler.close('1');

// Verify cleanup
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(0);
expect((sampler as any).streams.has('1')).toBe(false);
expect(mockStream.close).toHaveBeenCalled();
});

it('handles multiple connections to same peer', async () => {
const mockStream1: Partial<Stream> = {
id: '1',
close: jest.fn(),
} as Partial<Stream>;
const mockStream2: Partial<Stream> = {
id: '2',
close: jest.fn(),
} as Partial<Stream>;

mockLibp2p.dialProtocol.mockResolvedValueOnce(mockStream1).mockResolvedValueOnce(mockStream2);

await sampler.dialProtocol(peers[0], 'test');
await sampler.dialProtocol(peers[0], 'test');

expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(2);

await sampler.close('1');
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(1);

await sampler.close('2');
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(0);
});

it('handles errors during connection close', async () => {
const mockStream: Partial<Stream> = {
id: '1',
close: jest.fn().mockImplementation(() => Promise.reject(new Error('Failed to close'))),
} as Partial<Stream>;

mockLibp2p.dialProtocol.mockResolvedValue(mockStream);

await sampler.dialProtocol(peers[0], 'test');
await sampler.close('1');

// Should still clean up internal state even if close fails
expect((sampler as any).activeConnectionsCount.get(peers[0])).toBe(0);
expect((sampler as any).streams.has('1')).toBe(false);
});
});

describe('cleanup', () => {
it('cleans up stale connections', async () => {
const mockStream: Partial<Stream> = {
id: '1',
close: jest.fn(),
} as Partial<Stream>;

mockLibp2p.dialProtocol.mockResolvedValue(mockStream);
await sampler.dialProtocol(peers[0], 'test');

// Manually set activeConnectionsCount to 0 to simulate lost accounting
(sampler as any).activeConnectionsCount.set(peers[0], 0);

// Trigger cleanup
await sleep(600);

expect(mockStream.close).toHaveBeenCalled();
expect((sampler as any).streams.has('1')).toBe(false);
});

it('properly cleans up on stop', async () => {
const mockStream1: Partial<Stream> = {
id: '1',
close: jest.fn(),
} as Partial<Stream>;
const mockStream2: Partial<Stream> = {
id: '2',
close: jest.fn(),
} as Partial<Stream>;

mockLibp2p.dialProtocol.mockResolvedValueOnce(mockStream1).mockResolvedValueOnce(mockStream2);

await sampler.dialProtocol(peers[0], 'test');
await sampler.dialProtocol(peers[1], 'test');

await sampler.stop();

expect(mockStream1.close).toHaveBeenCalled();
expect(mockStream2.close).toHaveBeenCalled();
expect((sampler as any).streams.size).toBe(0);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { createLogger } from '@aztec/foundation/log';
import { RunningPromise } from '@aztec/foundation/running-promise';

import { type Libp2p, type PeerId, type Stream } from '@libp2p/interface';

const MAX_SAMPLE_ATTEMPTS = 4;

interface StreamAndPeerId {
stream: Stream;
peerId: PeerId;
}

export class RandomSampler {
random(max: number) {
return Math.floor(Math.random() * max);
}
}

/**
* A class that samples peers from the libp2p node and returns a peer that we don't already have a connection open to.
* If we already have a connection open, we try to sample a different peer.
* We do this MAX_SAMPLE_ATTEMPTS times, if we still don't find a peer we just go for it.
*
* @dev Close must always be called on connections, else memory leak
*/
export class ConnectionSampler {
private readonly logger = createLogger('p2p:reqresp:connection-sampler');
private cleanupJob?: RunningPromise;

private readonly activeConnectionsCount: Map<PeerId, number> = new Map();
private readonly streams: Map<string, StreamAndPeerId> = new Map();

constructor(
private readonly libp2p: Libp2p,
private readonly cleanupIntervalMs: number = 60000, // Default to 1 minute

// Random sampler provided so that it can be mocked
private readonly sampler: RandomSampler = new RandomSampler(),
) {
this.cleanupJob = new RunningPromise(() => this.cleanupStaleConnections(), this.logger, this.cleanupIntervalMs);
this.cleanupJob.start();
}

/**
* Stops the cleanup job and closes all active connections
*/
async stop() {
await this.cleanupJob?.stop();

// Close all active streams
const closePromises = Array.from(this.streams.keys()).map(streamId => this.close(streamId));

await Promise.all(closePromises);
}

getPeer(): PeerId {
const peers = this.libp2p.getPeers();

let randomIndex = this.sampler.random(peers.length);
let attempts = 0;
// If the active connections count is greater than 0, then we already have a connection open
// So we try to sample a different peer, but only MAX_SAMPLE_ATTEMPTS times
while ((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 && attempts < MAX_SAMPLE_ATTEMPTS) {
randomIndex = this.sampler.random(peers.length);
attempts++;
}

this.logger.trace(`Sampled peer in ${attempts} attempts`, {
attempts,
peer: peers[randomIndex]?.toString(),
});
return peers[randomIndex];
}

// Set of passthrough functions to keep track of active connections

/**
* Dials a protocol and returns the stream
*
* @param peerId - The peer id
* @param protocol - The protocol
* @returns The stream
*/
async dialProtocol(peerId: PeerId, protocol: string): Promise<Stream> {
const stream = await this.libp2p.dialProtocol(peerId, protocol);
this.streams.set(stream.id, { stream, peerId });

const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 0) + 1;
this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount);

this.logger.trace(`Dialed protocol ${protocol} with peer ${peerId.toString()}`, {
streamId: stream.id,
peerId: peerId.toString(),
activeConnectionsCount: updatedActiveConnectionsCount,
});
return stream;
}

/**
* Closes a stream and updates the active connections count
*
* @param streamId - The stream id
*/
async close(streamId: string): Promise<void> {
try {
const { stream, peerId } = this.streams.get(streamId)!;

const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 1) - 1;
this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be updated after successfully closing the connection?

Maybe a failure to close the connection means it's already closed or something anyway. In which case reducing the connection count is valid either way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I beleive that it is the second case, libp2p should get rid of the stream eventually


this.logger.trace(`Closing connection to peer ${peerId.toString()}`, {
streamId,
peerId: peerId.toString(),
protocol: stream.protocol,
activeConnectionsCount: updatedActiveConnectionsCount,
});

await stream?.close();
} catch (error) {
this.logger.error(`Failed to close connection to peer ${streamId}`, { error });
} finally {
this.streams.delete(streamId);
}
}

/**
* Cleans up stale connections that we have lost accounting for
*/
private async cleanupStaleConnections() {
// Look for streams without anything in the activeConnectionsCount
// If we find anything, close the stream
for (const [streamId, { peerId }] of this.streams.entries()) {
try {
// Check if we have lost track of accounting
if (this.activeConnectionsCount.get(peerId) === 0) {
await this.close(streamId);
this.logger.debug(`Cleaned up stale connection ${streamId} to peer ${peerId.toString()}`);
}
} catch (error) {
this.logger.error(`Error cleaning up stale connection ${streamId}`, { error });
}
}
}
}
Loading
Loading