Skip to content

Commit

Permalink
Use parameter struct for configuring QUIC streamer (#3328)
Browse files Browse the repository at this point in the history
refactor: Use parameter struct for configuring QUIC streamer
(cherry picked from commit 6319db8)
  • Loading branch information
sakridge authored and mergify[bot] committed Oct 30, 2024
1 parent 285e2e8 commit e1765e8
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 181 deletions.
21 changes: 8 additions & 13 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,9 @@ mod tests {
super::*,
crate::connection_cache::ConnectionCache,
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_sdk::signature::Keypair,
solana_streamer::{
nonblocking::quic::{
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
quic::SpawnServerResult,
quic::{QuicServerParams, SpawnServerResult},
streamer::StakedNodes,
},
std::{
Expand Down Expand Up @@ -246,14 +242,13 @@ mod tests {
&keypair2,
sender2,
response_recv_exit.clone(),
1,
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
..QuicServerParams::default()
},
)
.unwrap();

Expand Down
33 changes: 17 additions & 16 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ use {
},
solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::{
spawn_server_multi, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS,
spawn_server_multi, QuicServerParams, SpawnServerResult, MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
},
streamer::StakedNodes,
},
Expand Down Expand Up @@ -166,14 +166,13 @@ impl Tpu {
keypair,
packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
tpu_max_connections_per_ipaddr_per_minute,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
QuicServerParams {
max_connections_per_peer: MAX_QUIC_CONNECTIONS_PER_PEER,
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
..QuicServerParams::default()
},
)
.unwrap();

Expand All @@ -188,14 +187,16 @@ impl Tpu {
keypair,
forwarded_packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
DEFAULT_MAX_STREAMS_PER_MS,
tpu_max_connections_per_ipaddr_per_minute,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
QuicServerParams {
max_connections_per_peer: MAX_QUIC_CONNECTIONS_PER_PEER,
max_staked_connections: MAX_STAKED_CONNECTIONS
.saturating_add(MAX_UNSTAKED_CONNECTIONS),
max_unstaked_connections: 0, // Prevent unstaked nodes from forwarding transactions
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
..QuicServerParams::default()
},
)
.unwrap();

Expand Down
61 changes: 27 additions & 34 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@ mod tests {
solana_quic_client::nonblocking::quic_client::{
QuicClientCertificate, QuicLazyInitializedEndpoint,
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::{
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
quic::SpawnServerResult,
quic::{QuicServerParams, SpawnServerResult},
streamer::StakedNodes,
tls_certificates::new_dummy_x509_certificate,
},
Expand Down Expand Up @@ -83,14 +79,13 @@ mod tests {
&keypair,
sender,
exit.clone(),
1,
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
..QuicServerParams::default()
},
)
.unwrap();

Expand Down Expand Up @@ -169,14 +164,14 @@ mod tests {
&keypair,
sender,
exit.clone(),
1,
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
Duration::from_secs(1), // wait_for_chunk_timeout
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
wait_for_chunk_timeout: Duration::from_secs(1),
..QuicServerParams::default()
},
)
.unwrap();

Expand Down Expand Up @@ -233,14 +228,13 @@ mod tests {
&keypair,
sender,
request_recv_exit.clone(),
1,
staked_nodes.clone(),
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
..QuicServerParams::default()
},
)
.unwrap();

Expand All @@ -263,14 +257,13 @@ mod tests {
&keypair2,
sender2,
response_recv_exit.clone(),
1,
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
..QuicServerParams::default()
},
)
.unwrap();

Expand Down
72 changes: 26 additions & 46 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
STREAM_THROTTLING_INTERVAL_MS,
},
},
quic::{configure_server, QuicServerError, StreamerStats},
quic::{configure_server, QuicServerError, QuicServerParams, StreamerStats},
streamer::StakedNodes,
tls_certificates::get_pubkey_from_tls_certificate,
},
Expand Down Expand Up @@ -147,56 +147,45 @@ pub struct SpawnNonBlockingServerResult {
pub max_concurrent_connections: usize,
}

#[allow(clippy::too_many_arguments)]
pub fn spawn_server(
name: &'static str,
sock: UdpSocket,
keypair: &Keypair,
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_peer: usize,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
max_connections_per_ipaddr_per_min: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
quic_server_params: QuicServerParams,
) -> Result<SpawnNonBlockingServerResult, QuicServerError> {
spawn_server_multi(
name,
vec![sock],
keypair,
packet_sender,
exit,
max_connections_per_peer,
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout,
coalesce,
quic_server_params,
)
}

#[allow(clippy::too_many_arguments, clippy::type_complexity)]
pub fn spawn_server_multi(
name: &'static str,
sockets: Vec<UdpSocket>,
keypair: &Keypair,
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_peer: usize,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
max_connections_per_ipaddr_per_min: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
quic_server_params: QuicServerParams,
) -> Result<SpawnNonBlockingServerResult, QuicServerError> {
info!("Start {name} quic server on {sockets:?}");
let QuicServerParams {
max_unstaked_connections,
max_staked_connections,
max_connections_per_peer,
max_streams_per_ms,
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout,
coalesce,
} = quic_server_params;
let concurrent_connections = max_staked_connections + max_unstaked_connections;
let max_concurrent_connections = concurrent_connections + concurrent_connections / 4;
let (config, _) = configure_server(keypair)?;
Expand Down Expand Up @@ -1533,15 +1522,12 @@ impl<'a> Future for EndpointAccept<'a> {
pub mod test {
use {
super::*,
crate::{
nonblocking::{
quic::compute_max_allowed_uni_streams,
testing_utilities::{
get_client_config, make_client_endpoint, setup_quic_server,
SpawnTestServerResult, TestServerConfig,
},
crate::nonblocking::{
quic::compute_max_allowed_uni_streams,
testing_utilities::{
get_client_config, make_client_endpoint, setup_quic_server, SpawnTestServerResult,
TestServerConfig,
},
quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
},
assert_matches::assert_matches,
async_channel::unbounded as async_unbounded,
Expand Down Expand Up @@ -2018,14 +2004,11 @@ pub mod test {
&keypair,
sender,
exit.clone(),
1,
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_unstaked_connections: 0, // Do not allow any connection from unstaked clients/nodes
..QuicServerParams::default()
},
)
.unwrap();

Expand Down Expand Up @@ -2054,14 +2037,11 @@ pub mod test {
&keypair,
sender,
exit.clone(),
2,
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_connections_per_peer: 2,
..QuicServerParams::default()
},
)
.unwrap();

Expand Down
25 changes: 14 additions & 11 deletions streamer/src/nonblocking/testing_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
crate::{
quic::{StreamerStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
quic::{QuicServerParams, StreamerStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
tls_certificates::new_dummy_x509_certificate,
},
Expand Down Expand Up @@ -111,7 +111,7 @@ pub struct TestServerConfig {
pub max_staked_connections: usize,
pub max_unstaked_connections: usize,
pub max_streams_per_ms: u64,
pub max_connections_per_ipaddr_per_minute: u64,
pub max_connections_per_ipaddr_per_min: u64,
}

impl Default for TestServerConfig {
Expand All @@ -121,7 +121,7 @@ impl Default for TestServerConfig {
max_staked_connections: MAX_STAKED_CONNECTIONS,
max_unstaked_connections: MAX_UNSTAKED_CONNECTIONS,
max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
max_connections_per_ipaddr_per_minute: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
}
}
}
Expand Down Expand Up @@ -176,14 +176,23 @@ pub fn setup_quic_server_with_sockets(
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_minute,
max_connections_per_ipaddr_per_min,
}: TestServerConfig,
) -> SpawnTestServerResult {
let exit = Arc::new(AtomicBool::new(false));
let (sender, receiver) = unbounded();
let keypair = Keypair::new();
let server_address = sockets[0].local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default()));
let quic_server_params = QuicServerParams {
max_connections_per_peer,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
coalesce: DEFAULT_TPU_COALESCE,
};
let SpawnNonBlockingServerResult {
endpoints: _,
stats,
Expand All @@ -195,14 +204,8 @@ pub fn setup_quic_server_with_sockets(
&keypair,
sender,
exit.clone(),
max_connections_per_peer,
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_minute,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
quic_server_params,
)
.unwrap();
SpawnTestServerResult {
Expand Down
Loading

0 comments on commit e1765e8

Please sign in to comment.