forked from solana-labs/solana
-
Notifications
You must be signed in to change notification settings - Fork 381
/
Copy pathconnection_worker.rs
256 lines (240 loc) · 10.1 KB
/
connection_worker.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
//! This module defines [`ConnectionWorker`] which encapsulates the functionality
//! needed to handle one connection within the scope of task.
use {
super::SendTransactionStats,
crate::{
quic_networking::send_data_over_stream, send_transaction_stats::record_error,
transaction_batch::TransactionBatch,
},
log::*,
quinn::{ConnectError, Connection, Endpoint},
solana_clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS},
solana_measure::measure::Measure,
solana_time_utils::timestamp,
std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc},
},
tokio::{
sync::mpsc,
time::{sleep, Duration},
},
tokio_util::sync::CancellationToken,
};
/// Interval between retry attempts for creating a new connection. This value is
/// a best-effort estimate, based on current network conditions.
const RETRY_SLEEP_INTERVAL: Duration =
Duration::from_millis(NUM_CONSECUTIVE_LEADER_SLOTS * DEFAULT_MS_PER_SLOT);
/// Maximum age (in milliseconds) of a blockhash, beyond which transaction
/// batches are dropped.
const MAX_PROCESSING_AGE_MS: u64 = MAX_PROCESSING_AGE as u64 * DEFAULT_MS_PER_SLOT;
/// [`ConnectionState`] represents the current state of a quic connection.
///
/// It tracks the lifecycle of connection from initial setup to closing phase.
/// The transition function between states is defined in `ConnectionWorker`
/// implementation.
enum ConnectionState {
NotSetup,
Active(Connection),
Retry(usize),
Closing,
}
impl Drop for ConnectionState {
/// When [`ConnectionState`] is dropped, underlying connection is closed
/// which means that there is no guarantee that the open streams will
/// finish.
fn drop(&mut self) {
if let Self::Active(connection) = self {
debug!(
"Close connection with {:?}, stats: {:?}. All pending streams will be dropped.",
connection.remote_address(),
connection.stats()
);
connection.close(0u32.into(), b"done");
}
}
}
/// [`ConnectionWorker`] holds connection to the validator with address `peer`.
///
/// If connection has been closed, [`ConnectionWorker`] tries to reconnect
/// `max_reconnect_attempts` times. If connection is in `Active` state, it sends
/// transactions received from `transactions_receiver`. Additionally, it
/// accumulates statistics about connections and streams failures.
pub(crate) struct ConnectionWorker {
endpoint: Endpoint,
peer: SocketAddr,
transactions_receiver: mpsc::Receiver<TransactionBatch>,
connection: ConnectionState,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: Arc<SendTransactionStats>,
cancel: CancellationToken,
}
impl ConnectionWorker {
/// Constructs a [`ConnectionWorker`].
///
/// [`ConnectionWorker`] maintains a connection to a `peer` and processes
/// transactions from `transactions_receiver`. If
/// `skip_check_transaction_age` is set to `true`, the worker skips checking
/// for transaction blockhash expiration. The `max_reconnect_attempts`
/// parameter controls how many times the worker will attempt to reconnect
/// in case of connection failure. Returns the created `ConnectionWorker`
/// along with a cancellation token that can be used by the caller to stop
/// the worker.
pub fn new(
endpoint: Endpoint,
peer: SocketAddr,
transactions_receiver: mpsc::Receiver<TransactionBatch>,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: Arc<SendTransactionStats>,
) -> (Self, CancellationToken) {
let cancel = CancellationToken::new();
let this = Self {
endpoint,
peer,
transactions_receiver,
connection: ConnectionState::NotSetup,
skip_check_transaction_age,
max_reconnect_attempts,
send_txs_stats,
cancel: cancel.clone(),
};
(this, cancel)
}
/// Starts the main loop of the [`ConnectionWorker`].
///
/// This method manages the connection to the peer and handles state
/// transitions. It runs indefinitely until the connection is closed or an
/// unrecoverable error occurs.
pub async fn run(&mut self) {
let cancel = self.cancel.clone();
let main_loop = async move {
loop {
match &self.connection {
ConnectionState::Closing => {
break;
}
ConnectionState::NotSetup => {
self.create_connection(0).await;
}
ConnectionState::Active(connection) => {
let Some(transactions) = self.transactions_receiver.recv().await else {
debug!("Transactions sender has been dropped.");
self.connection = ConnectionState::Closing;
continue;
};
self.send_transactions(connection.clone(), transactions)
.await;
}
ConnectionState::Retry(num_reconnects) => {
if *num_reconnects > self.max_reconnect_attempts {
error!("Failed to establish connection: reach max reconnect attempts.");
self.connection = ConnectionState::Closing;
continue;
}
sleep(RETRY_SLEEP_INTERVAL).await;
self.reconnect(*num_reconnects).await;
}
}
}
};
tokio::select! {
() = main_loop => (),
() = cancel.cancelled() => (),
}
}
/// Sends a batch of transactions using the provided `connection`.
///
/// Each transaction in the batch is sent over the QUIC streams one at the
/// time, which prevents traffic fragmentation and shows better TPS in
/// comparison with multistream send. If the batch is determined to be
/// outdated and flag `skip_check_transaction_age` is unset, it will be
/// dropped without being sent.
///
/// In case of error, it doesn't retry to send the same transactions again.
async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) {
let now = timestamp();
if !self.skip_check_transaction_age
&& now.saturating_sub(transactions.timestamp()) > MAX_PROCESSING_AGE_MS
{
debug!("Drop outdated transaction batch.");
return;
}
let mut measure_send = Measure::start("send transaction batch");
for data in transactions.into_iter() {
let result = send_data_over_stream(&connection, &data).await;
if let Err(error) = result {
trace!("Failed to send transaction over stream with error: {error}.");
record_error(error, &self.send_txs_stats);
self.connection = ConnectionState::Retry(0);
} else {
self.send_txs_stats
.successfully_sent
.fetch_add(1, Ordering::Relaxed);
}
}
measure_send.stop();
debug!(
"Time to send transactions batch: {} us",
measure_send.as_us()
);
}
/// Attempts to create a new connection to the specified `peer` address.
///
/// If the connection is successful, the state is updated to `Active`.
///
/// If an error occurs, the state may transition to `Retry` or `Closing`,
/// depending on the nature of the error.
async fn create_connection(&mut self, max_retries_attempt: usize) {
let connecting = self.endpoint.connect(self.peer, "connect");
match connecting {
Ok(connecting) => {
let mut measure_connection = Measure::start("establish connection");
let res = connecting.await;
measure_connection.stop();
debug!(
"Establishing connection with {} took: {} us",
self.peer,
measure_connection.as_us()
);
match res {
Ok(connection) => {
self.connection = ConnectionState::Active(connection);
}
Err(err) => {
warn!("Connection error {}: {}", self.peer, err);
record_error(err.into(), &self.send_txs_stats);
self.connection =
ConnectionState::Retry(max_retries_attempt.saturating_add(1));
}
}
}
Err(connecting_error) => {
record_error(connecting_error.clone().into(), &self.send_txs_stats);
match connecting_error {
ConnectError::EndpointStopping => {
debug!("Endpoint stopping, exit connection worker.");
self.connection = ConnectionState::Closing;
}
ConnectError::InvalidRemoteAddress(_) => {
warn!("Invalid remote address.");
self.connection = ConnectionState::Closing;
}
e => {
error!("Unexpected error has happen while trying to create connection {e}");
self.connection = ConnectionState::Closing;
}
}
}
}
}
/// Attempts to reconnect to the peer after a connection failure.
async fn reconnect(&mut self, num_reconnects: usize) {
debug!("Trying to reconnect. Reopen connection, 0rtt is not implemented yet.");
// We can reconnect using 0rtt, but not a priority for now. Check if we
// need to call config.enable_0rtt() on the client side and where
// session tickets are stored.
self.create_connection(num_reconnects).await;
}
}