Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.1: add fanout to tpu-client-next (backport of #3478) #3523

Merged
merged 6 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ solana-test-validator = { path = "test-validator", version = "=2.1.2" }
solana-thin-client = { path = "thin-client", version = "=2.1.2" }
solana-transaction-error = { path = "sdk/transaction-error", version = "=2.1.2" }
solana-tpu-client = { path = "tpu-client", version = "=2.1.2", default-features = false }
solana-tpu-client-next = { path = "tpu-client-next", version = "=2.1.2" }
solana-transaction-status = { path = "transaction-status", version = "=2.1.2" }
solana-transaction-status-client-types = { path = "transaction-status-client-types", version = "=2.1.2" }
solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=2.1.2" }
Expand Down
26 changes: 13 additions & 13 deletions tpu-client-next/src/connection_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use {
clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS},
timing::timestamp,
},
std::net::SocketAddr,
std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc},
},
tokio::{
sync::mpsc,
time::{sleep, Duration},
Expand Down Expand Up @@ -72,7 +75,7 @@ pub(crate) struct ConnectionWorker {
connection: ConnectionState,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: SendTransactionStats,
send_txs_stats: Arc<SendTransactionStats>,
cancel: CancellationToken,
}

Expand All @@ -93,6 +96,7 @@ impl ConnectionWorker {
transactions_receiver: mpsc::Receiver<TransactionBatch>,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: Arc<SendTransactionStats>,
) -> (Self, CancellationToken) {
let cancel = CancellationToken::new();

Expand All @@ -103,7 +107,7 @@ impl ConnectionWorker {
connection: ConnectionState::NotSetup,
skip_check_transaction_age,
max_reconnect_attempts,
send_txs_stats: SendTransactionStats::default(),
send_txs_stats,
cancel: cancel.clone(),
};

Expand Down Expand Up @@ -155,11 +159,6 @@ impl ConnectionWorker {
}
}

/// Retrieves the statistics for transactions sent by this worker.
pub fn transaction_stats(&self) -> &SendTransactionStats {
&self.send_txs_stats
}

/// Sends a batch of transactions using the provided `connection`.
///
/// Each transaction in the batch is sent over the QUIC streams one at the
Expand All @@ -183,11 +182,12 @@ impl ConnectionWorker {

if let Err(error) = result {
trace!("Failed to send transaction over stream with error: {error}.");
record_error(error, &mut self.send_txs_stats);
record_error(error, &self.send_txs_stats);
self.connection = ConnectionState::Retry(0);
} else {
self.send_txs_stats.successfully_sent =
self.send_txs_stats.successfully_sent.saturating_add(1);
self.send_txs_stats
.successfully_sent
.fetch_add(1, Ordering::Relaxed);
}
}
measure_send.stop();
Expand Down Expand Up @@ -221,14 +221,14 @@ impl ConnectionWorker {
}
Err(err) => {
warn!("Connection error {}: {}", self.peer, err);
record_error(err.into(), &mut self.send_txs_stats);
record_error(err.into(), &self.send_txs_stats);
self.connection =
ConnectionState::Retry(max_retries_attempt.saturating_add(1));
}
}
}
Err(connecting_error) => {
record_error(connecting_error.clone().into(), &mut self.send_txs_stats);
record_error(connecting_error.clone().into(), &self.send_txs_stats);
match connecting_error {
ConnectError::EndpointStopping => {
debug!("Endpoint stopping, exit connection worker.");
Expand Down
122 changes: 80 additions & 42 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use {
create_client_config, create_client_endpoint, QuicClientCertificate, QuicError,
},
transaction_batch::TransactionBatch,
workers_cache::{WorkerInfo, WorkersCache, WorkersCacheError},
workers_cache::{maybe_shutdown_worker, WorkerInfo, WorkersCache, WorkersCacheError},
SendTransactionStats,
},
log::*,
quinn::Endpoint,
Expand Down Expand Up @@ -39,6 +40,25 @@ pub enum ConnectionWorkersSchedulerError {
LeaderReceiverDropped,
}

/// [`Fanout`] is a configuration struct that specifies how many leaders should
/// be targeted when sending transactions and connecting.
///
/// Note, that the unit is number of leaders per
/// [`NUM_CONSECUTIVE_LEADER_SLOTS`]. It means that if the leader schedule is
/// [L1, L1, L1, L1, L1, L1, L1, L1, L2, L2, L2, L2], the leaders per
/// consecutive leader slots are [L1, L1, L2], so there are 3 of them.
///
/// The idea of having a separate `connect` parameter is to create a set of
/// nodes to connect to in advance in order to hide the latency of opening new
/// connection. Hence, `connect` must be greater or equal to `send`
pub struct Fanout {
/// The number of leaders to target for sending transactions.
pub send: usize,

/// The number of leaders to target for establishing connections.
pub connect: usize,
}

/// Configuration for the [`ConnectionWorkersScheduler`].
///
/// This struct holds the necessary settings to initialize and manage connection
Expand Down Expand Up @@ -66,10 +86,8 @@ pub struct ConnectionWorkersSchedulerConfig {
/// connection failure.
pub max_reconnect_attempts: usize,

/// The number of slots to look ahead during the leader estimation
/// procedure. Determines how far into the future leaders are estimated,
/// allowing connections to be established with those leaders in advance.
pub lookahead_slots: u64,
/// Configures the number of leaders to connect to and send transactions to.
pub leaders_fanout: Fanout,
}

impl ConnectionWorkersScheduler {
Expand All @@ -90,7 +108,7 @@ impl ConnectionWorkersScheduler {
skip_check_transaction_age,
worker_channel_size,
max_reconnect_attempts,
lookahead_slots,
leaders_fanout,
}: ConnectionWorkersSchedulerConfig,
mut leader_updater: Box<dyn LeaderUpdater>,
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
Expand All @@ -99,6 +117,7 @@ impl ConnectionWorkersScheduler {
let endpoint = Self::setup_endpoint(bind, validator_identity)?;
debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
let mut workers = WorkersCache::new(num_connections, cancel.clone());
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new();

loop {
let transaction_batch = tokio::select! {
Expand All @@ -114,50 +133,49 @@ impl ConnectionWorkersScheduler {
break;
}
};
let updated_leaders = leader_updater.next_leaders(lookahead_slots);
let new_leader = &updated_leaders[0];
let future_leaders = &updated_leaders[1..];
if !workers.contains(new_leader) {
debug!("No existing workers for {new_leader:?}, starting a new one.");
let worker = Self::spawn_worker(
&endpoint,
new_leader,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
);
workers.push(*new_leader, worker).await;
}

tokio::select! {
send_res = workers.send_transactions_to_address(new_leader, transaction_batch) => match send_res {
Ok(()) => (),
Err(WorkersCacheError::ShutdownError) => {
debug!("Connection to {new_leader} was closed, worker cache shutdown");
}
Err(err) => {
warn!("Connection to {new_leader} was closed, worker error: {err}");
// If we has failed to send batch, it will be dropped.
}
},
() = cancel.cancelled() => {
debug!("Cancelled: Shutting down");
break;
}
};
let updated_leaders = leader_updater.next_leaders(leaders_fanout.connect);

// Regardless of who is leader, add future leaders to the cache to
// hide the latency of opening the connection.
for peer in future_leaders {
let (fanout_leaders, connect_leaders) =
split_leaders(&updated_leaders, &leaders_fanout);
// add future leaders to the cache to hide the latency of opening
// the connection.
for peer in connect_leaders {
if !workers.contains(peer) {
let stats = send_stats_per_addr.entry(peer.ip()).or_default();
let worker = Self::spawn_worker(
&endpoint,
peer,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
stats.clone(),
);
workers.push(*peer, worker).await;
maybe_shutdown_worker(workers.push(*peer, worker));
}
}

for new_leader in fanout_leaders {
if !workers.contains(new_leader) {
warn!("No existing worker for {new_leader:?}, skip sending to this leader.");
continue;
}

let send_res =
workers.try_send_transactions_to_address(new_leader, transaction_batch.clone());
match send_res {
Ok(()) => (),
Err(WorkersCacheError::ShutdownError) => {
debug!("Connection to {new_leader} was closed, worker cache shutdown");
}
Err(WorkersCacheError::ReceiverDropped) => {
// Remove the worker from the cache, if the peer has disconnected.
maybe_shutdown_worker(workers.pop(*new_leader));
}
Err(err) => {
warn!("Connection to {new_leader} was closed, worker error: {err}");
// If we has failed to send batch, it will be dropped.
}
}
}
}
Expand All @@ -166,7 +184,7 @@ impl ConnectionWorkersScheduler {

endpoint.close(0u32.into(), b"Closing connection");
leader_updater.stop().await;
Ok(workers.transaction_stats().clone())
Ok(send_stats_per_addr)
}

/// Sets up the QUIC endpoint for the scheduler to handle connections.
Expand All @@ -191,6 +209,7 @@ impl ConnectionWorkersScheduler {
worker_channel_size: usize,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
stats: Arc<SendTransactionStats>,
) -> WorkerInfo {
let (txs_sender, txs_receiver) = mpsc::channel(worker_channel_size);
let endpoint = endpoint.clone();
Expand All @@ -202,12 +221,31 @@ impl ConnectionWorkersScheduler {
txs_receiver,
skip_check_transaction_age,
max_reconnect_attempts,
stats,
);
let handle = tokio::spawn(async move {
worker.run().await;
worker.transaction_stats().clone()
});

WorkerInfo::new(txs_sender, handle, cancel)
}
}

/// Splits `leaders` into two slices based on the `fanout` configuration:
/// * the first slice contains the leaders to which transactions will be sent,
/// * the second vector contains the leaders, used to warm up connections. This
/// slice includes the the first set.
fn split_leaders<'leaders>(
leaders: &'leaders [SocketAddr],
fanout: &Fanout,
) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) {
let Fanout { send, connect } = fanout;
assert!(send <= connect);
let send_count = (*send).min(leaders.len());
let connect_count = (*connect).min(leaders.len());

let send_slice = &leaders[..send_count];
let connect_slice = &leaders[..connect_count];

(send_slice, connect_slice)
}
15 changes: 11 additions & 4 deletions tpu-client-next/src/leader_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use {
log::*,
solana_connection_cache::connection_cache::Protocol,
solana_rpc_client::nonblocking::rpc_client::RpcClient,
solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS,
solana_tpu_client::nonblocking::tpu_client::LeaderTpuService,
std::{
fmt,
Expand All @@ -22,26 +23,30 @@ use {
Arc,
},
},
thiserror::Error,
};

/// [`LeaderUpdater`] trait abstracts out functionality required for the
/// [`ConnectionWorkersScheduler`](crate::ConnectionWorkersScheduler) to
/// identify next leaders to send transactions to.
#[async_trait]
pub trait LeaderUpdater: Send {
/// Returns next unique leaders for the next `lookahead_slots` starting from
/// Returns next leaders for the next `lookahead_leaders` starting from
/// current estimated slot.
///
/// Leaders are returned per [`NUM_CONSECUTIVE_LEADER_SLOTS`] to avoid unnecessary repetition.
///
/// If the current leader estimation is incorrect and transactions are sent to
/// only one estimated leader, there is a risk of losing all the transactions,
/// depending on the forwarding policy.
fn next_leaders(&self, lookahead_slots: u64) -> Vec<SocketAddr>;
fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr>;

/// Stop [`LeaderUpdater`] and releases all associated resources.
async fn stop(&mut self);
}

/// Error type for [`LeaderUpdater`].
#[derive(Error, PartialEq)]
pub struct LeaderUpdaterError;

impl fmt::Display for LeaderUpdaterError {
Expand Down Expand Up @@ -98,7 +103,9 @@ struct LeaderUpdaterService {

#[async_trait]
impl LeaderUpdater for LeaderUpdaterService {
fn next_leaders(&self, lookahead_slots: u64) -> Vec<SocketAddr> {
fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr> {
let lookahead_slots =
(lookahead_leaders as u64).saturating_mul(NUM_CONSECUTIVE_LEADER_SLOTS);
self.leader_tpu_service.leader_tpu_sockets(lookahead_slots)
}

Expand All @@ -116,7 +123,7 @@ struct PinnedLeaderUpdater {

#[async_trait]
impl LeaderUpdater for PinnedLeaderUpdater {
fn next_leaders(&self, _lookahead_slots: u64) -> Vec<SocketAddr> {
fn next_leaders(&mut self, _lookahead_leaders: usize) -> Vec<SocketAddr> {
self.address.clone()
}

Expand Down
Loading
Loading