Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use parameter struct for configuring QUIC streamer #3328

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,9 @@ mod tests {
super::*,
crate::connection_cache::ConnectionCache,
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_sdk::signature::Keypair,
solana_streamer::{
nonblocking::quic::{
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
quic::SpawnServerResult,
quic::{QuicServerParams, SpawnServerResult},
streamer::StakedNodes,
},
std::{
Expand Down Expand Up @@ -246,14 +242,13 @@ mod tests {
&keypair2,
sender2,
response_recv_exit.clone(),
1,
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
..QuicServerParams::default()
},
)
.unwrap();

Expand Down
33 changes: 17 additions & 16 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ use {
},
solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::{
spawn_server_multi, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS,
spawn_server_multi, QuicServerParams, SpawnServerResult, MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
},
streamer::StakedNodes,
},
Expand Down Expand Up @@ -166,14 +166,13 @@ impl Tpu {
keypair,
packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
tpu_max_connections_per_ipaddr_per_minute,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
QuicServerParams {
max_connections_per_peer: MAX_QUIC_CONNECTIONS_PER_PEER,
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
..QuicServerParams::default()
},
)
.unwrap();

Expand All @@ -188,14 +187,16 @@ impl Tpu {
keypair,
forwarded_packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
DEFAULT_MAX_STREAMS_PER_MS,
tpu_max_connections_per_ipaddr_per_minute,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
QuicServerParams {
max_connections_per_peer: MAX_QUIC_CONNECTIONS_PER_PEER,
max_staked_connections: MAX_STAKED_CONNECTIONS
.saturating_add(MAX_UNSTAKED_CONNECTIONS),
max_unstaked_connections: 0, // Prevent unstaked nodes from forwarding transactions
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
..QuicServerParams::default()
},
)
.unwrap();

Expand Down
61 changes: 27 additions & 34 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@ mod tests {
solana_quic_client::nonblocking::quic_client::{
QuicClientCertificate, QuicLazyInitializedEndpoint,
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::{
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
quic::SpawnServerResult,
quic::{QuicServerParams, SpawnServerResult},
streamer::StakedNodes,
tls_certificates::new_dummy_x509_certificate,
},
Expand Down Expand Up @@ -83,14 +79,13 @@ mod tests {
&keypair,
sender,
exit.clone(),
1,
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
..QuicServerParams::default()
},
)
.unwrap();

Expand Down Expand Up @@ -169,14 +164,14 @@ mod tests {
&keypair,
sender,
exit.clone(),
1,
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
Duration::from_secs(1), // wait_for_chunk_timeout
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
wait_for_chunk_timeout: Duration::from_secs(1),
..QuicServerParams::default()
},
)
.unwrap();

Expand Down Expand Up @@ -233,14 +228,13 @@ mod tests {
&keypair,
sender,
request_recv_exit.clone(),
1,
staked_nodes.clone(),
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
..QuicServerParams::default()
},
)
.unwrap();

Expand All @@ -263,14 +257,13 @@ mod tests {
&keypair2,
sender2,
response_recv_exit.clone(),
1,
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
..QuicServerParams::default()
},
)
.unwrap();

Expand Down
72 changes: 26 additions & 46 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
STREAM_THROTTLING_INTERVAL_MS,
},
},
quic::{configure_server, QuicServerError, StreamerStats},
quic::{configure_server, QuicServerError, QuicServerParams, StreamerStats},
streamer::StakedNodes,
tls_certificates::get_pubkey_from_tls_certificate,
},
Expand Down Expand Up @@ -132,56 +132,45 @@ pub struct SpawnNonBlockingServerResult {
pub max_concurrent_connections: usize,
}

#[allow(clippy::too_many_arguments)]
pub fn spawn_server(
name: &'static str,
sock: UdpSocket,
keypair: &Keypair,
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_peer: usize,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
max_connections_per_ipaddr_per_min: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
quic_server_params: QuicServerParams,
) -> Result<SpawnNonBlockingServerResult, QuicServerError> {
spawn_server_multi(
name,
vec![sock],
keypair,
packet_sender,
exit,
max_connections_per_peer,
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout,
coalesce,
quic_server_params,
)
}

#[allow(clippy::too_many_arguments, clippy::type_complexity)]
pub fn spawn_server_multi(
name: &'static str,
sockets: Vec<UdpSocket>,
keypair: &Keypair,
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_peer: usize,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
max_connections_per_ipaddr_per_min: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
quic_server_params: QuicServerParams,
) -> Result<SpawnNonBlockingServerResult, QuicServerError> {
info!("Start {name} quic server on {sockets:?}");
let QuicServerParams {
max_unstaked_connections,
max_staked_connections,
max_connections_per_peer,
max_streams_per_ms,
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout,
coalesce,
} = quic_server_params;
let concurrent_connections = max_staked_connections + max_unstaked_connections;
let max_concurrent_connections = concurrent_connections + concurrent_connections / 4;
let (config, _) = configure_server(keypair)?;
Expand Down Expand Up @@ -1491,15 +1480,12 @@ impl<'a> Future for EndpointAccept<'a> {
pub mod test {
use {
super::*,
crate::{
nonblocking::{
quic::compute_max_allowed_uni_streams,
testing_utilities::{
get_client_config, make_client_endpoint, setup_quic_server,
SpawnTestServerResult, TestServerConfig,
},
crate::nonblocking::{
quic::compute_max_allowed_uni_streams,
testing_utilities::{
get_client_config, make_client_endpoint, setup_quic_server, SpawnTestServerResult,
TestServerConfig,
},
quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
},
assert_matches::assert_matches,
async_channel::unbounded as async_unbounded,
Expand Down Expand Up @@ -1975,14 +1961,11 @@ pub mod test {
&keypair,
sender,
exit.clone(),
1,
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_unstaked_connections: 0, // Do not allow any connection from unstaked clients/nodes
..QuicServerParams::default()
},
)
.unwrap();

Expand Down Expand Up @@ -2011,14 +1994,11 @@ pub mod test {
&keypair,
sender,
exit.clone(),
2,
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
QuicServerParams {
max_connections_per_peer: 2,
..QuicServerParams::default()
},
)
.unwrap();

Expand Down
25 changes: 14 additions & 11 deletions streamer/src/nonblocking/testing_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
crate::{
quic::{StreamerStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
quic::{QuicServerParams, StreamerStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
tls_certificates::new_dummy_x509_certificate,
},
Expand Down Expand Up @@ -111,7 +111,7 @@ pub struct TestServerConfig {
pub max_staked_connections: usize,
pub max_unstaked_connections: usize,
pub max_streams_per_ms: u64,
pub max_connections_per_ipaddr_per_minute: u64,
pub max_connections_per_ipaddr_per_min: u64,
}

impl Default for TestServerConfig {
Expand All @@ -121,7 +121,7 @@ impl Default for TestServerConfig {
max_staked_connections: MAX_STAKED_CONNECTIONS,
max_unstaked_connections: MAX_UNSTAKED_CONNECTIONS,
max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
max_connections_per_ipaddr_per_minute: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
}
}
}
Expand Down Expand Up @@ -176,14 +176,23 @@ pub fn setup_quic_server_with_sockets(
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_minute,
max_connections_per_ipaddr_per_min,
}: TestServerConfig,
) -> SpawnTestServerResult {
let exit = Arc::new(AtomicBool::new(false));
let (sender, receiver) = unbounded();
let keypair = Keypair::new();
let server_address = sockets[0].local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default()));
let quic_server_params = QuicServerParams {
max_connections_per_peer,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
coalesce: DEFAULT_TPU_COALESCE,
};
let SpawnNonBlockingServerResult {
endpoints: _,
stats,
Expand All @@ -195,14 +204,8 @@ pub fn setup_quic_server_with_sockets(
&keypair,
sender,
exit.clone(),
max_connections_per_peer,
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_minute,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
quic_server_params,
)
.unwrap();
SpawnTestServerResult {
Expand Down
Loading
Loading