Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reapply 3515: add tpu client next to sts #4758

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 28 additions & 1 deletion programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion send-transaction-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
async-trait = { workspace = true }
crossbeam-channel = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
solana-client = { workspace = true }
solana-connection-cache = { workspace = true }
solana-keypair = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-tpu-client = { workspace = true }
solana-tpu-client-next = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }

[dev-dependencies]
solana-logger = { workspace = true }
Expand Down
35 changes: 30 additions & 5 deletions send-transaction-service/src/send_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,10 @@ impl SendTransactionService {
mod test {
use {
super::*,
crate::{test_utils::ClientWithCreator, tpu_info::NullTpuInfo},
crate::{
test_utils::ClientWithCreator, tpu_info::NullTpuInfo,
transaction_client::TpuClientNextClient,
},
crossbeam_channel::{bounded, unbounded},
solana_sdk::{
account::AccountSharedData,
Expand Down Expand Up @@ -541,14 +544,19 @@ mod test {

drop(sender);
send_transaction_service.join().unwrap();
client.cancel();
client.stop();
}

#[test]
fn service_exit_with_connection_cache() {
service_exit::<ConnectionCacheClient<NullTpuInfo>>(None);
}

#[tokio::test(flavor = "multi_thread")]
async fn service_exit_with_tpu_client_next() {
service_exit::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
}

fn validator_exit<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
let bank = Bank::default_for_tests();
let bank_forks = BankForks::new_rw_arc(bank);
Expand Down Expand Up @@ -581,7 +589,7 @@ mod test {

thread::spawn(move || {
exit.store(true, Ordering::Relaxed);
client.cancel();
client.stop();
});

let mut option = Ok(());
Expand All @@ -595,6 +603,11 @@ mod test {
validator_exit::<ConnectionCacheClient<NullTpuInfo>>(None);
}

#[tokio::test(flavor = "multi_thread")]
async fn validator_exit_with_tpu_client_next() {
validator_exit::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
}

fn process_transactions<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
solana_logger::setup();

Expand Down Expand Up @@ -857,14 +870,19 @@ mod test {
..ProcessTransactionsResult::default()
}
);
client.cancel();
client.stop();
}

#[test]
fn process_transactions_with_connection_cache() {
process_transactions::<ConnectionCacheClient<NullTpuInfo>>(None);
}

#[tokio::test(flavor = "multi_thread")]
async fn process_transactions_with_tpu_client_next() {
process_transactions::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
}

fn retry_durable_nonce_transactions<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
solana_logger::setup();

Expand Down Expand Up @@ -1162,11 +1180,18 @@ mod test {
..ProcessTransactionsResult::default()
}
);
client.cancel();
client.stop();
}

#[test]
fn retry_durable_nonce_transactions_with_connection_cache() {
retry_durable_nonce_transactions::<ConnectionCacheClient<NullTpuInfo>>(None);
}

#[tokio::test(flavor = "multi_thread")]
async fn retry_durable_nonce_transactions_with_tpu_client_next() {
retry_durable_nonce_transactions::<TpuClientNextClient<NullTpuInfo>>(Some(
Handle::current(),
));
}
}
45 changes: 38 additions & 7 deletions send-transaction-service/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
use {
crate::{
tpu_info::NullTpuInfo,
transaction_client::{ConnectionCacheClient, TpuInfoWithSendStatic, TransactionClient},
transaction_client::{
ConnectionCacheClient, TpuClientNextClient, TpuInfoWithSendStatic, TransactionClient,
},
},
solana_client::connection_cache::ConnectionCache,
std::{net::SocketAddr, sync::Arc},
Expand Down Expand Up @@ -42,23 +44,52 @@ impl CreateClient for ConnectionCacheClient<NullTpuInfo> {
}
}

pub trait Cancelable {
fn cancel(&self);
impl CreateClient for TpuClientNextClient<NullTpuInfo> {
fn create_client(
maybe_runtime: Option<Handle>,
my_tpu_address: SocketAddr,
tpu_peers: Option<Vec<SocketAddr>>,
leader_forward_count: u64,
) -> Self {
let runtime_handle =
maybe_runtime.expect("Runtime should be provided for the TpuClientNextClient.");
Self::new(
runtime_handle,
my_tpu_address,
tpu_peers,
None,
leader_forward_count,
None,
)
}
}

pub trait Stoppable {
fn stop(&self);
}

impl<T> Cancelable for ConnectionCacheClient<T>
impl<T> Stoppable for ConnectionCacheClient<T>
where
T: TpuInfoWithSendStatic,
{
fn cancel(&self) {}
fn stop(&self) {}
}

impl<T> Stoppable for TpuClientNextClient<T>
where
T: TpuInfoWithSendStatic + Clone,
{
fn stop(&self) {
self.cancel().unwrap();
}
}

// Define type alias to simplify definition of test functions.
pub trait ClientWithCreator:
CreateClient + TransactionClient + Cancelable + Send + Clone + 'static
CreateClient + TransactionClient + Stoppable + Send + Clone + 'static
{
}
impl<T> ClientWithCreator for T where
T: CreateClient + TransactionClient + Cancelable + Send + Clone + 'static
T: CreateClient + TransactionClient + Stoppable + Send + Clone + 'static
{
}
4 changes: 0 additions & 4 deletions send-transaction-service/src/tpu_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ pub trait TpuInfo {
///
/// For example, if leader schedule was `[L1, L1, L1, L1, L2, L2, L2, L2,
/// L1, ...]` it will return `[L1, L2, L1]`.
#[allow(
dead_code,
reason = "This function will be used when tpu-client-next will be added to this module."
)]
fn get_not_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>;

/// In addition to the tpu address, also return the leader slot
Expand Down
Loading
Loading