forked from solana-labs/solana
-
Notifications
You must be signed in to change notification settings - Fork 381
/
Copy pathconnection_workers_scheduler.rs
359 lines (323 loc) · 13.7 KB
/
connection_workers_scheduler.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
//! This module defines [`ConnectionWorkersScheduler`] which sends transactions
//! to the upcoming leaders.
use {
super::{leader_updater::LeaderUpdater, SendTransactionStatsPerAddr},
crate::{
connection_worker::ConnectionWorker,
quic_networking::{
create_client_config, create_client_endpoint, QuicClientCertificate, QuicError,
},
transaction_batch::TransactionBatch,
workers_cache::{maybe_shutdown_worker, WorkerInfo, WorkersCache, WorkersCacheError},
SendTransactionStats,
},
async_trait::async_trait,
log::*,
quinn::Endpoint,
solana_keypair::Keypair,
std::{net::SocketAddr, sync::Arc},
thiserror::Error,
tokio::sync::mpsc,
tokio_util::sync::CancellationToken,
};
/// The [`ConnectionWorkersScheduler`] sends transactions from the provided
/// receiver channel to upcoming leaders. It obtains information about future
/// leaders from the implementation of the [`LeaderUpdater`] trait.
///
/// Internally, it enables the management and coordination of multiple network
/// connections, schedules and oversees connection workers.
pub struct ConnectionWorkersScheduler;
/// Errors that arise from running [`ConnectionWorkersSchedulerError`].
#[derive(Debug, Error, PartialEq)]
pub enum ConnectionWorkersSchedulerError {
#[error(transparent)]
QuicError(#[from] QuicError),
#[error(transparent)]
WorkersCacheError(#[from] WorkersCacheError),
#[error("Leader receiver unexpectedly dropped.")]
LeaderReceiverDropped,
}
/// [`Fanout`] is a configuration struct that specifies how many leaders should
/// be targeted when sending transactions and connecting.
///
/// Note, that the unit is number of leaders per
/// [`solana_clock::NUM_CONSECUTIVE_LEADER_SLOTS`]. It means that if the leader schedule is
/// [L1, L1, L1, L1, L1, L1, L1, L1, L2, L2, L2, L2], the leaders per
/// consecutive leader slots are [L1, L1, L2], so there are 3 of them.
///
/// The idea of having a separate `connect` parameter is to create a set of
/// nodes to connect to in advance in order to hide the latency of opening new
/// connection. Hence, `connect` must be greater or equal to `send`
pub struct Fanout {
/// The number of leaders to target for sending transactions.
pub send: usize,
/// The number of leaders to target for establishing connections.
pub connect: usize,
}
/// Configuration for the [`ConnectionWorkersScheduler`].
///
/// This struct holds the necessary settings to initialize and manage connection
/// workers, including network binding, identity, connection limits, and
/// behavior related to transaction handling.
pub struct ConnectionWorkersSchedulerConfig {
/// The local address to bind the scheduler to.
pub bind: SocketAddr,
/// Optional stake identity keypair used in the endpoint certificate for
/// identifying the sender.
pub stake_identity: Option<StakeIdentity>,
/// The number of connections to be maintained by the scheduler.
pub num_connections: usize,
/// Whether to skip checking the transaction blockhash expiration.
pub skip_check_transaction_age: bool,
/// The size of the channel used to transmit transaction batches to the
/// worker tasks.
pub worker_channel_size: usize,
/// The maximum number of reconnection attempts allowed in case of
/// connection failure.
pub max_reconnect_attempts: usize,
/// Configures the number of leaders to connect to and send transactions to.
pub leaders_fanout: Fanout,
}
/// The [`StakeIdentity`] structure provides a convenient abstraction for handling
/// [`Keypair`] when creating a QUIC certificate. Since `Keypair` does not implement
/// [`Clone`], it cannot be moved in situations where [`ConnectionWorkersSchedulerConfig`]
/// needs to be transferred. This wrapper structure allows the use of either a `Keypair`
/// or a `&Keypair` to create a certificate, which is stored internally and later
/// consumed by [`ConnectionWorkersScheduler`] to create an endpoint.
pub struct StakeIdentity(QuicClientCertificate);
impl From<Keypair> for StakeIdentity {
fn from(keypair: Keypair) -> Self {
Self(QuicClientCertificate::new(Some(&keypair)))
}
}
impl From<&Keypair> for StakeIdentity {
fn from(keypair: &Keypair) -> Self {
Self(QuicClientCertificate::new(Some(keypair)))
}
}
impl From<StakeIdentity> for QuicClientCertificate {
fn from(identity: StakeIdentity) -> Self {
identity.0
}
}
/// The [`WorkersBroadcaster`] trait defines a customizable mechanism for
/// sending transaction batches to workers corresponding to the provided list of
/// addresses. Implementations of this trait are used by the
/// [`ConnectionWorkersScheduler`] to distribute transactions to workers
/// accordingly.
#[async_trait]
pub trait WorkersBroadcaster {
/// Sends a `transaction_batch` to workers associated with the given
/// `leaders` addresses.
///
/// Returns error if a critical issue occurs, e.g. the implementation
/// encounters an unrecoverable error. In this case, it will trigger
/// stopping the scheduler and cleaning all the data.
async fn send_to_workers(
workers: &mut WorkersCache,
leaders: &[SocketAddr],
transaction_batch: TransactionBatch,
) -> Result<(), ConnectionWorkersSchedulerError>;
}
pub type TransactionStatsAndReceiver = (
SendTransactionStatsPerAddr,
mpsc::Receiver<TransactionBatch>,
);
impl ConnectionWorkersScheduler {
/// Starts the scheduler, which manages the distribution of transactions to
/// the network's upcoming leaders.
///
/// This method is a shorthand for
/// [`ConnectionWorkersScheduler::run_with_broadcaster`] using
/// `NonblockingBroadcaster` strategy.
///
/// Transactions that fail to be delivered to workers due to full channels
/// will be dropped. The same for transactions that failed to be delivered
/// over the network.
pub async fn run(
config: ConnectionWorkersSchedulerConfig,
leader_updater: Box<dyn LeaderUpdater>,
transaction_receiver: mpsc::Receiver<TransactionBatch>,
cancel: CancellationToken,
) -> Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError> {
Self::run_with_broadcaster::<NonblockingBroadcaster>(
config,
leader_updater,
transaction_receiver,
cancel,
)
.await
}
/// Starts the scheduler, which manages the distribution of transactions to
/// the network's upcoming leaders. `Broadcaster` allows to customize the
/// way transactions are send to the leaders, see [`WorkersBroadcaster`].
///
/// Runs the main loop that handles worker scheduling and management for
/// connections. Returns the error quic statistics per connection address or
/// an error along with receiver for transactions. The receiver returned
/// back to the user because in some cases we need to re-utilize the same
/// receiver for the new scheduler. For example, this happens when the
/// identity for the validator is updated.
///
/// Importantly, if some transactions were not delivered due to network
/// problems, they will not be retried when the problem is resolved.
pub async fn run_with_broadcaster<Broadcaster: WorkersBroadcaster>(
ConnectionWorkersSchedulerConfig {
bind,
stake_identity,
num_connections,
skip_check_transaction_age,
worker_channel_size,
max_reconnect_attempts,
leaders_fanout,
}: ConnectionWorkersSchedulerConfig,
mut leader_updater: Box<dyn LeaderUpdater>,
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
cancel: CancellationToken,
) -> Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError> {
let endpoint = Self::setup_endpoint(bind, stake_identity)?;
debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
let mut workers = WorkersCache::new(num_connections, cancel.clone());
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new();
let mut last_error = None;
loop {
let transaction_batch: TransactionBatch = tokio::select! {
recv_res = transaction_receiver.recv() => match recv_res {
Some(txs) => txs,
None => {
debug!("End of `transaction_receiver`: shutting down.");
break;
}
},
() = cancel.cancelled() => {
debug!("Cancelled: Shutting down");
break;
}
};
let connect_leaders = leader_updater.next_leaders(leaders_fanout.connect);
let send_leaders = extract_send_leaders(&connect_leaders, leaders_fanout.send);
// add future leaders to the cache to hide the latency of opening
// the connection.
for peer in connect_leaders {
if !workers.contains(&peer) {
let stats = send_stats_per_addr.entry(peer.ip()).or_default();
let worker = Self::spawn_worker(
&endpoint,
&peer,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
stats.clone(),
);
maybe_shutdown_worker(workers.push(peer, worker));
}
}
if let Err(error) =
Broadcaster::send_to_workers(&mut workers, &send_leaders, transaction_batch).await
{
last_error = Some(error);
break;
}
}
workers.shutdown().await;
endpoint.close(0u32.into(), b"Closing connection");
leader_updater.stop().await;
if let Some(error) = last_error {
return Err(error);
}
Ok((send_stats_per_addr, transaction_receiver))
}
/// Sets up the QUIC endpoint for the scheduler to handle connections.
fn setup_endpoint(
bind: SocketAddr,
stake_identity: Option<StakeIdentity>,
) -> Result<Endpoint, ConnectionWorkersSchedulerError> {
let client_certificate = match stake_identity {
Some(identity) => identity.into(),
None => QuicClientCertificate::new(None),
};
let client_config = create_client_config(client_certificate);
let endpoint = create_client_endpoint(bind, client_config)?;
Ok(endpoint)
}
/// Spawns a worker to handle communication with a given peer.
fn spawn_worker(
endpoint: &Endpoint,
peer: &SocketAddr,
worker_channel_size: usize,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
stats: Arc<SendTransactionStats>,
) -> WorkerInfo {
let (txs_sender, txs_receiver) = mpsc::channel(worker_channel_size);
let endpoint = endpoint.clone();
let peer = *peer;
let (mut worker, cancel) = ConnectionWorker::new(
endpoint,
peer,
txs_receiver,
skip_check_transaction_age,
max_reconnect_attempts,
stats,
);
let handle = tokio::spawn(async move {
worker.run().await;
});
WorkerInfo::new(txs_sender, handle, cancel)
}
}
/// [`NonblockingBroadcaster`] attempts to immediately send transactions to all
/// the workers. If worker cannot accept transactions because it's channel is
/// full, the transactions will not be sent to this worker.
struct NonblockingBroadcaster;
#[async_trait]
impl WorkersBroadcaster for NonblockingBroadcaster {
async fn send_to_workers(
workers: &mut WorkersCache,
leaders: &[SocketAddr],
transaction_batch: TransactionBatch,
) -> Result<(), ConnectionWorkersSchedulerError> {
for new_leader in leaders {
if !workers.contains(new_leader) {
warn!("No existing worker for {new_leader:?}, skip sending to this leader.");
continue;
}
let send_res =
workers.try_send_transactions_to_address(new_leader, transaction_batch.clone());
match send_res {
Ok(()) => (),
Err(WorkersCacheError::ShutdownError) => {
debug!("Connection to {new_leader} was closed, worker cache shutdown");
}
Err(WorkersCacheError::ReceiverDropped) => {
// Remove the worker from the cache, if the peer has disconnected.
maybe_shutdown_worker(workers.pop(*new_leader));
}
Err(err) => {
warn!("Connection to {new_leader} was closed, worker error: {err}");
// If we have failed to send batch, it will be dropped.
}
}
}
Ok(())
}
}
/// Extracts a list of unique leader addresses to which transactions will be sent.
///
/// This function selects up to `send_fanout` addresses from the `leaders` list, ensuring that
/// only unique addresses are included while maintaining their original order.
fn extract_send_leaders(leaders: &[SocketAddr], send_fanout: usize) -> Vec<SocketAddr> {
let send_count = send_fanout.min(leaders.len());
remove_duplicates(&leaders[..send_count])
}
/// Removes duplicate `SocketAddr` elements from the given slice while
/// preserving their original order.
fn remove_duplicates(input: &[SocketAddr]) -> Vec<SocketAddr> {
let mut res = Vec::with_capacity(input.len());
for address in input {
if !res.contains(address) {
res.push(*address);
}
}
res
}