diff --git a/Cargo.lock b/Cargo.lock index bda773c8f70dd6..51b4a81e033a3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9576,18 +9576,21 @@ dependencies = [ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ + "async-trait", "crossbeam-channel", "itertools 0.12.1", "log", "solana-client", "solana-connection-cache", + "solana-keypair", "solana-logger", "solana-measure", "solana-metrics", "solana-runtime", "solana-sdk", - "solana-tpu-client", + "solana-tpu-client-next", "tokio", + "tokio-util 0.7.13", ] [[package]] diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 7122345fd69014..09de057c591aef 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -8102,17 +8102,20 @@ dependencies = [ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ + "async-trait", "crossbeam-channel", "itertools 0.12.1", "log", "solana-client", "solana-connection-cache", + "solana-keypair", "solana-measure", "solana-metrics", "solana-runtime", "solana-sdk", - "solana-tpu-client", + "solana-tpu-client-next", "tokio", + "tokio-util 0.7.13", ] [[package]] @@ -8673,6 +8676,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-tpu-client-next" +version = "2.2.0" +dependencies = [ + "async-trait", + "log", + "lru", + "quinn", + "rustls 0.23.23", + "solana-clock", + "solana-connection-cache", + "solana-keypair", + "solana-measure", + "solana-quic-definitions", + "solana-rpc-client", + "solana-streamer", + "solana-time-utils", + "solana-tls-utils", + "solana-tpu-client", + "thiserror 2.0.11", + "tokio", + "tokio-util 0.7.13", +] + [[package]] name = "solana-transaction" version = "2.2.1" diff --git a/send-transaction-service/Cargo.toml b/send-transaction-service/Cargo.toml index e13ffe5d7da716..9030e5443fabc1 100644 --- a/send-transaction-service/Cargo.toml +++ b/send-transaction-service/Cargo.toml @@ -10,17 +10,20 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +async-trait = { workspace = true } crossbeam-channel = { workspace = true } itertools = { workspace = true } log = { workspace = true } solana-client = { workspace = true } solana-connection-cache = { workspace = true } +solana-keypair = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } -solana-tpu-client = { workspace = true } +solana-tpu-client-next = { workspace = true } tokio = { workspace = true, features = ["full"] } +tokio-util = { workspace = true } [dev-dependencies] solana-logger = { workspace = true } diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 9b06b1233a853a..908c83f5494967 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -507,7 +507,10 @@ impl SendTransactionService { mod test { use { super::*, - crate::{test_utils::ClientWithCreator, tpu_info::NullTpuInfo}, + crate::{ + test_utils::ClientWithCreator, tpu_info::NullTpuInfo, + transaction_client::TpuClientNextClient, + }, crossbeam_channel::{bounded, unbounded}, solana_sdk::{ account::AccountSharedData, @@ -541,7 +544,7 @@ mod test { drop(sender); send_transaction_service.join().unwrap(); - client.cancel(); + client.stop(); } #[test] @@ -549,6 +552,11 @@ mod test { service_exit::>(None); } + #[tokio::test(flavor = "multi_thread")] + async fn service_exit_with_tpu_client_next() { + service_exit::>(Some(Handle::current())); + } + fn validator_exit(maybe_runtime: Option) { let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); @@ -581,7 +589,7 @@ mod test { thread::spawn(move || { exit.store(true, Ordering::Relaxed); - client.cancel(); + client.stop(); }); let mut option = Ok(()); @@ -595,6 +603,11 @@ mod test { validator_exit::>(None); } + #[tokio::test(flavor = "multi_thread")] + async fn validator_exit_with_tpu_client_next() { + validator_exit::>(Some(Handle::current())); + } + fn process_transactions(maybe_runtime: Option) { solana_logger::setup(); @@ -857,7 +870,7 @@ mod test { ..ProcessTransactionsResult::default() } ); - client.cancel(); + client.stop(); } #[test] @@ -865,6 +878,11 @@ mod test { process_transactions::>(None); } + #[tokio::test(flavor = "multi_thread")] + async fn process_transactions_with_tpu_client_next() { + process_transactions::>(Some(Handle::current())); + } + fn retry_durable_nonce_transactions(maybe_runtime: Option) { solana_logger::setup(); @@ -1162,11 +1180,18 @@ mod test { ..ProcessTransactionsResult::default() } ); - client.cancel(); + client.stop(); } #[test] fn retry_durable_nonce_transactions_with_connection_cache() { retry_durable_nonce_transactions::>(None); } + + #[tokio::test(flavor = "multi_thread")] + async fn retry_durable_nonce_transactions_with_tpu_client_next() { + retry_durable_nonce_transactions::>(Some( + Handle::current(), + )); + } } diff --git a/send-transaction-service/src/test_utils.rs b/send-transaction-service/src/test_utils.rs index 4db9166726f4e9..f8e2526287f58d 100644 --- a/send-transaction-service/src/test_utils.rs +++ b/send-transaction-service/src/test_utils.rs @@ -4,7 +4,9 @@ use { crate::{ tpu_info::NullTpuInfo, - transaction_client::{ConnectionCacheClient, TpuInfoWithSendStatic, TransactionClient}, + transaction_client::{ + ConnectionCacheClient, TpuClientNextClient, TpuInfoWithSendStatic, TransactionClient, + }, }, solana_client::connection_cache::ConnectionCache, std::{net::SocketAddr, sync::Arc}, @@ -42,23 +44,52 @@ impl CreateClient for ConnectionCacheClient { } } -pub trait Cancelable { - fn cancel(&self); +impl CreateClient for TpuClientNextClient { + fn create_client( + maybe_runtime: Option, + my_tpu_address: SocketAddr, + tpu_peers: Option>, + leader_forward_count: u64, + ) -> Self { + let runtime_handle = + maybe_runtime.expect("Runtime should be provided for the TpuClientNextClient."); + Self::new( + runtime_handle, + my_tpu_address, + tpu_peers, + None, + leader_forward_count, + None, + ) + } +} + +pub trait Stoppable { + fn stop(&self); } -impl Cancelable for ConnectionCacheClient +impl Stoppable for ConnectionCacheClient where T: TpuInfoWithSendStatic, { - fn cancel(&self) {} + fn stop(&self) {} +} + +impl Stoppable for TpuClientNextClient +where + T: TpuInfoWithSendStatic + Clone, +{ + fn stop(&self) { + self.cancel().unwrap(); + } } // Define type alias to simplify definition of test functions. pub trait ClientWithCreator: - CreateClient + TransactionClient + Cancelable + Send + Clone + 'static + CreateClient + TransactionClient + Stoppable + Send + Clone + 'static { } impl ClientWithCreator for T where - T: CreateClient + TransactionClient + Cancelable + Send + Clone + 'static + T: CreateClient + TransactionClient + Stoppable + Send + Clone + 'static { } diff --git a/send-transaction-service/src/tpu_info.rs b/send-transaction-service/src/tpu_info.rs index 073167638986bf..33052f6082eeb3 100644 --- a/send-transaction-service/src/tpu_info.rs +++ b/send-transaction-service/src/tpu_info.rs @@ -23,10 +23,6 @@ pub trait TpuInfo { /// /// For example, if leader schedule was `[L1, L1, L1, L1, L2, L2, L2, L2, /// L1, ...]` it will return `[L1, L2, L1]`. - #[allow( - dead_code, - reason = "This function will be used when tpu-client-next will be added to this module." - )] fn get_not_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>; /// In addition to the tpu address, also return the leader slot diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index 5ede4ce78d8c01..5f8ecabb5bbdad 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -1,16 +1,37 @@ use { crate::{send_transaction_service_stats::SendTransactionServiceStats, tpu_info::TpuInfo}, - log::warn, - solana_client::connection_cache::ConnectionCache, + async_trait::async_trait, + log::{debug, error, warn}, + solana_client::connection_cache::{ConnectionCache, Protocol}, solana_connection_cache::client_connection::ClientConnection as TpuConnection, + solana_keypair::Keypair, solana_measure::measure::Measure, + solana_sdk::quic::NotifyKeyUpdate, + solana_tpu_client_next::{ + connection_workers_scheduler::{ + ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver, + }, + leader_updater::LeaderUpdater, + transaction_batch::TransactionBatch, + ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, + }, std::{ - net::SocketAddr, + net::{Ipv4Addr, SocketAddr}, sync::{atomic::Ordering, Arc, Mutex}, time::{Duration, Instant}, }, + tokio::{ + runtime::Handle, + sync::mpsc::{self}, + task::JoinHandle as TokioJoinHandle, + }, + tokio_util::sync::CancellationToken, }; +/// How many connections to maintain the tpu-client-next cache. The value is +/// chosen to match MAX_CONNECTIONS from ConnectionCache +const MAX_CONNECTIONS: usize = 1024; + // Alias trait to shorten function definitions. pub trait TpuInfoWithSendStatic: TpuInfo + std::marker::Send + 'static {} impl TpuInfoWithSendStatic for T where T: TpuInfo + std::marker::Send + 'static {} @@ -21,6 +42,11 @@ pub trait TransactionClient { wire_transactions: Vec>, stats: &SendTransactionServiceStats, ); + + #[cfg(any(test, feature = "dev-context-only-utils"))] + fn protocol(&self) -> Protocol; + + fn exit(&self); } pub struct ConnectionCacheClient { @@ -126,6 +152,22 @@ where self.send_transactions(address, wire_transactions.clone(), stats); } } + + #[cfg(any(test, feature = "dev-context-only-utils"))] + fn protocol(&self) -> Protocol { + self.connection_cache.protocol() + } + + fn exit(&self) {} +} + +impl NotifyKeyUpdate for ConnectionCacheClient +where + T: TpuInfoWithSendStatic, +{ + fn update_key(&self, identity: &Keypair) -> Result<(), Box> { + self.connection_cache.update_key(identity) + } } /// The leader info refresh rate. @@ -133,6 +175,7 @@ pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000; /// A struct responsible for holding up-to-date leader information /// used for sending transactions. +#[derive(Clone)] pub struct CurrentLeaderInfo where T: TpuInfoWithSendStatic, @@ -176,3 +219,236 @@ where } } } + +/// `TpuClientNextClient` provides an interface for managing the +/// [`ConnectionWorkersScheduler`]. +/// +/// It allows: +/// * Create and initializes the scheduler with runtime configurations, +/// * Send transactions to the connection scheduler, +/// * Update the validator identity keypair and propagate the changes to the +/// scheduler. Most of the complexity of this structure arises from this +/// functionality. +#[derive(Clone)] +pub struct TpuClientNextClient +where + T: TpuInfoWithSendStatic + Clone, +{ + runtime_handle: Handle, + sender: mpsc::Sender, + // This handle is needed to implement `NotifyKeyUpdate` trait. It's only + // method takes &self and thus we need to wrap with Mutex. + join_and_cancel: Arc, CancellationToken)>>, + leader_updater: SendTransactionServiceLeaderUpdater, + leader_forward_count: u64, +} + +type TpuClientJoinHandle = + TokioJoinHandle>; + +impl TpuClientNextClient +where + T: TpuInfoWithSendStatic + Clone, +{ + pub fn new( + runtime_handle: Handle, + my_tpu_address: SocketAddr, + tpu_peers: Option>, + leader_info: Option, + leader_forward_count: u64, + identity: Option<&Keypair>, + ) -> Self + where + T: TpuInfoWithSendStatic + Clone, + { + // The channel size represents 8s worth of transactions at a rate of + // 1000 tps, assuming batch size is 64. + let (sender, receiver) = mpsc::channel(128); + + let cancel = CancellationToken::new(); + + let leader_info_provider = CurrentLeaderInfo::new(leader_info); + let leader_updater: SendTransactionServiceLeaderUpdater = + SendTransactionServiceLeaderUpdater { + leader_info_provider, + my_tpu_address, + tpu_peers, + }; + let config = Self::create_config(identity, leader_forward_count as usize); + let handle = runtime_handle.spawn(ConnectionWorkersScheduler::run( + config, + Box::new(leader_updater.clone()), + receiver, + cancel.clone(), + )); + + Self { + runtime_handle, + join_and_cancel: Arc::new(Mutex::new((Some(handle), cancel))), + sender, + leader_updater, + leader_forward_count, + } + } + + fn create_config( + stake_identity: Option<&Keypair>, + leader_forward_count: usize, + ) -> ConnectionWorkersSchedulerConfig { + ConnectionWorkersSchedulerConfig { + bind: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0), + stake_identity: stake_identity.map(Into::into), + num_connections: MAX_CONNECTIONS, + skip_check_transaction_age: true, + // experimentally found parameter values + worker_channel_size: 64, + max_reconnect_attempts: 4, + leaders_fanout: Fanout { + connect: leader_forward_count, + send: leader_forward_count, + }, + } + } + + #[cfg(any(test, feature = "dev-context-only-utils"))] + pub fn cancel(&self) -> Result<(), Box> { + let Ok(lock) = self.join_and_cancel.lock() else { + return Err("Failed to stop scheduler.".into()); + }; + lock.1.cancel(); + Ok(()) + } + + async fn do_update_key(&self, identity: &Keypair) -> Result<(), Box> { + let runtime_handle = self.runtime_handle.clone(); + let config = Self::create_config(Some(identity), self.leader_forward_count as usize); + let leader_updater = self.leader_updater.clone(); + let handle = self.join_and_cancel.clone(); + + let join_handle = { + let Ok(mut lock) = handle.lock() else { + return Err("TpuClientNext task panicked.".into()); + }; + let (handle, token) = std::mem::take(&mut *lock); + token.cancel(); + handle + }; + + if let Some(join_handle) = join_handle { + let Ok(result) = join_handle.await else { + return Err("TpuClientNext task panicked.".into()); + }; + + match result { + Ok((_stats, receiver)) => { + let cancel = CancellationToken::new(); + let join_handle = runtime_handle.spawn(ConnectionWorkersScheduler::run( + config, + Box::new(leader_updater), + receiver, + cancel.clone(), + )); + + let Ok(mut lock) = handle.lock() else { + return Err("TpuClientNext task panicked.".into()); + }; + *lock = (Some(join_handle), cancel); + } + Err(error) => { + return Err(Box::new(error)); + } + } + } + Ok(()) + } +} + +impl NotifyKeyUpdate for TpuClientNextClient +where + T: TpuInfoWithSendStatic + Clone, +{ + fn update_key(&self, identity: &Keypair) -> Result<(), Box> { + self.runtime_handle.block_on(self.do_update_key(identity)) + } +} + +impl TransactionClient for TpuClientNextClient +where + T: TpuInfoWithSendStatic + Clone, +{ + fn send_transactions_in_batch( + &self, + wire_transactions: Vec>, + stats: &SendTransactionServiceStats, + ) { + let mut measure = Measure::start("send-us"); + self.runtime_handle.spawn({ + let sender = self.sender.clone(); + async move { + let res = sender.send(TransactionBatch::new(wire_transactions)).await; + if res.is_err() { + warn!("Failed to send transaction to channel: it is closed."); + } + } + }); + + measure.stop(); + stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed); + stats.send_attempt_count.fetch_add(1, Ordering::Relaxed); + } + + #[cfg(any(test, feature = "dev-context-only-utils"))] + fn protocol(&self) -> Protocol { + Protocol::QUIC + } + + fn exit(&self) { + let Ok(mut lock) = self.join_and_cancel.lock() else { + error!("Failed to stop scheduler: TpuClientNext task panicked."); + return; + }; + let (cancel, token) = std::mem::take(&mut *lock); + token.cancel(); + let Some(handle) = cancel else { + error!("Client task handle was not set."); + return; + }; + match self.runtime_handle.block_on(handle) { + Ok(result) => match result { + Ok(stats) => { + debug!("tpu-client-next statistics over all the connections: {stats:?}"); + } + Err(error) => error!("tpu-client-next exits with error {error}."), + }, + Err(error) => error!("Failed to join task {error}."), + } + } +} + +#[derive(Clone)] +pub struct SendTransactionServiceLeaderUpdater { + leader_info_provider: CurrentLeaderInfo, + my_tpu_address: SocketAddr, + tpu_peers: Option>, +} + +#[async_trait] +impl LeaderUpdater for SendTransactionServiceLeaderUpdater +where + T: TpuInfoWithSendStatic, +{ + fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec { + let discovered_peers = self + .leader_info_provider + .get_leader_info() + .map(|leader_info| { + leader_info.get_not_unique_leader_tpus(lookahead_leaders as u64, Protocol::QUIC) + }) + .filter(|addresses| !addresses.is_empty()) + .unwrap_or_else(|| vec![&self.my_tpu_address]); + let mut all_peers = self.tpu_peers.clone().unwrap_or_default(); + all_peers.extend(discovered_peers.into_iter().cloned()); + all_peers + } + async fn stop(&mut self) {} +} diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index b835d93d8af669..537e442fb80859 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -7422,17 +7422,20 @@ dependencies = [ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ + "async-trait", "crossbeam-channel", "itertools 0.12.1", "log", "solana-client", "solana-connection-cache", + "solana-keypair", "solana-measure", "solana-metrics", "solana-runtime", "solana-sdk", - "solana-tpu-client", + "solana-tpu-client-next", "tokio", + "tokio-util 0.7.13", ] [[package]] @@ -8010,6 +8013,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-tpu-client-next" +version = "2.2.0" +dependencies = [ + "async-trait", + "log", + "lru", + "quinn", + "rustls 0.23.23", + "solana-clock", + "solana-connection-cache", + "solana-keypair", + "solana-measure", + "solana-quic-definitions", + "solana-rpc-client", + "solana-streamer", + "solana-time-utils", + "solana-tls-utils", + "solana-tpu-client", + "thiserror 2.0.11", + "tokio", + "tokio-util 0.7.13", +] + [[package]] name = "solana-transaction" version = "2.2.1"