-
Notifications
You must be signed in to change notification settings - Fork 381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a new client implementation targeting TPU #2905
Add a new client implementation targeting TPU #2905
Conversation
The CI breaks with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall the PR looks good, would prefer a local cache (sorted by prioritization fees) for each connection worker, and back pressure logic implemented in connection worker as we know we can send only N transactions on a connection at a time for staked or unstaked connection. The N is calculated using stakes for the node.
/// to send the same transactions again. | ||
async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) { | ||
let now = timestamp(); | ||
if now.saturating_sub(transactions.get_timestamp()) > MAX_PROCESSING_AGE_MS { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic could be done over block height, usually transaction is valid for 300 block, during epoch boundary first slot may take few seconds to come, in this case the transactions will be dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would definitely be more precise, but I think there are 2 challenges:
- The transactions are already serialized into bytes at this point, so extracting blockhash/slot is tricky.
- Even if we're within the 150 block limit, by the time it gets sent/received/included in a block, we might be outside the window, so we would likely need to introduce some fudge factor anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@KirillLykov Do you know if we're hitting this and dumping tx batches during normal operation? I'm assuming it depends on where the bottleneck is between tx generation/scheduling/sending and the backpressure mechanism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed these concerns with @godmodegalactus and Groovie|mango and I'm quite convinced that this thing must be implemented if we want to use this client not only for benchmarking cluster.
Probably, this filtering should be configurable because it is not always necessary (might be implemented by caller).
I would like to add these changes in the follow up PR because it is already heavy PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this should be in the caller somehow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a simple way to move this check to the caller because it is a channel where caller puts his transactions and he cannot go over it and clean them up. But I think a decent solution is to do some age check optionally (controlled by boolean flag). Because actually there is only one correct way to check the age of tx and because there are only two possible options -- we want this check or not.
return; | ||
} | ||
let mut measure_send = Measure::start("send transaction batch"); | ||
for data in transactions.into_iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Track how many transactions have already been sent and being treated, for staked connections there is a way to calculate how many parallel streams a connection can have. Try to parallize only that many transactions at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to parallelize anything. Parallelizing makes things slower by creating fragmentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We count how many txs have been sent in SendTransactionStats::successfully_sent
counter. Regarding send in parallel using number of streams, it is no longer needed because on the server side will not allow us to send more than it can accept from a client with provided stake.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some feedback:
regarding error granularity:
- The current approach to error handling in the
send_transactions
andcreate_connection
methods works but could benefit from more granular logging, especially in critical error paths. For example, instead of just loggingUnexpected error has happened...
, it would be nice if you classify the types of unexpected errors so troubleshooting later on is easier. - In
send_transactions
, when a batch fails, perhaps it should log more details about which part of the stream failed and any available details about the connection state that could help with debugging
It might be beneficial to make some of the constants instead configurable via an env var or other configuration mechanism
Might be able to make use of the tokio-metrics
crate for further optimization and debugging work and/or might be helpful to have it turned on here
There is no need to do this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basic framework looks good to me. Left a few comments
|
||
#[async_trait] | ||
impl LeaderUpdater for LeaderUpdaterService { | ||
fn get_leaders(&self) -> Vec<SocketAddr> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be nice to call this something like get_next_num_lookahead_slots_leaders
. This is a mouthful... so maybe we can come up with something more concise, but the reason I mention this is that when I see the function get_leaders
, my assumption is we're pulling down all of the leaders for the current epoch.
Another option is to add a function header comment to make it clear we only return the next N leaders based on our estimation of the current leader and the number of lookahead_slots defined when creating the leader updater
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be time to stop using java conventions and use name
and set_name
for getters and setters :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bw-solana I agree that get_leader
might be misleading, changed to next_num_lookahead_slots_leaders
as proposed.
debug!("Client endpoint bind address: {:?}", endpoint.local_addr()); | ||
let mut workers = WorkersCache::new(num_connections); | ||
loop { | ||
let Some(transaction_batch) = transaction_receiver.recv().await else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would there be performance benefit to pulling all transaction batches off the receiver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean take all the batches from transaction_receiver
and send them all after that?
The idea is that the bounded channel is used here (of size 2 in case of transaction-bench), so that we have a back-pressure on the sender side of the channel. If we take all the batches from the receiver, the generator part (which puts batches to sender) will create some new transactions although we maybe are slow in sending (due to throttling on the server).
}; | ||
|
||
/// Size of the channel to transmit transaction batches to the target workers. | ||
const WORKER_CHANNEL_SIZE: usize = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the assumption that we expect the tx generator/scheduler to be the bottleneck? Or do we ever expect to block on submitting tx batches to the workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bottleneck was always sending over network part because server limits number of streams/receive window size to quite limiting values. As far as I remember generator can easily generate 350k tps but the sender part will be limited by ~4k tps in the best scenario (with one connection, one stake, no fanout).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followup, but this (and possibly others) needs to be arguments to some call. The
client shouldn't make assumptions about at what rate txs are going to be produced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will introduce a configuration structure with this and some other parameters to be specified
/// to send the same transactions again. | ||
async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) { | ||
let now = timestamp(); | ||
if now.saturating_sub(transactions.get_timestamp()) > MAX_PROCESSING_AGE_MS { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would definitely be more precise, but I think there are 2 challenges:
- The transactions are already serialized into bytes at this point, so extracting blockhash/slot is tricky.
- Even if we're within the 150 block limit, by the time it gets sent/received/included in a block, we might be outside the window, so we would likely need to introduce some fudge factor anyways.
/// to send the same transactions again. | ||
async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) { | ||
let now = timestamp(); | ||
if now.saturating_sub(transactions.get_timestamp()) > MAX_PROCESSING_AGE_MS { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@KirillLykov Do you know if we're hitting this and dumping tx batches during normal operation? I'm assuming it depends on where the bottleneck is between tx generation/scheduling/sending and the backpressure mechanism
In case of record_error(connecting_error.clone().into(), &mut self.send_txs_stats);
match connecting_error {
ConnectError::EndpointStopping => {
debug!("Endpoint stopping, exit connection worker.");
self.connection = ConnectionState::Closing;
}
ConnectError::InvalidRemoteAddress(_) => {
warn!("Invalid remote address.");
self.connection = ConnectionState::Closing;
}
e => {
error!("Unexpected error has happen while trying to create connection {e}");
self.connection = ConnectionState::Closing;
}
} So this classification in a way happens on the
Do you mean to save in logs full error information returned by
Not sure it is good to add as part of this PR because it is already quite heavy, but might be a good idea if this crate will be used by client code outside of agave. How it is usually done? Using features?
Here there is no metrics to reduce the complexity of this PR. Probably will have to add another PR with metrics. |
f4997c4
to
5b96db1
Compare
This PR was updated to use quinn v0.11 because agave meanwhile also started using it. This update is in the commit 1ed08b1 which is adaptation of the changes @ilya-bobyr did for the agave fork |
nah features don't really have an interface for specifying settings in the Usually you would use either but not necessary here, just something that could/should be undertaken if people eventually want to customize these values |
|
||
impl Default for QuicClientCertificate { | ||
fn default() -> Self { | ||
QuicClientCertificate::new(&Keypair::new()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? I'm always wary of adding ::default() impls that are
convenient but do the wrong thing. What is calling this? Can this be moved to
the caller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is used only in one place (setup_endpoint
) so no problem in moving it
/// Implementation of [`ServerCertVerifier`] that ignores the server certificate. But still checks | ||
/// the TLS signatures. | ||
/// | ||
/// This is a duplicate of `solana_quic_client::nonblocking::quic_client::SkipServerVerification`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This last line seems unnecessary, hopefully the old quic-client goes away soon
/// to identify next leaders to send transactions to. | ||
#[async_trait] | ||
pub trait LeaderUpdater: Send { | ||
fn next_num_lookahead_slots_leaders(&self) -> Vec<SocketAddr>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: strange name? I would have called it next_leaders
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed it, was get_leaders
: #2905 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next_leaders(num: usize) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While improving documentation have decided that it is better to move the number of lookahead slots to the trait. So applied this change
/// connection to the peer and handles state transitions. It runs | ||
/// indefinitely until the connection is closed or an unrecoverable error | ||
/// occurs. | ||
pub async fn run(&mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be fixed in a followup since it's probably not a small change, but with
the API as it is now, it's impossible to tell whether ConnectionWorker::run()
succeeded or failed (other than looking at stats, which is not a good API).
I think that run() should probably return Result, and whenever there's a fatal
un-retriable error it should return the actual error.
} | ||
|
||
/// Retrieves the statistics for transactions sent by this worker. | ||
pub fn get_transaction_stats(&self) -> &SendTransactionStats { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the convention in rust for getters is value() and set_value() for setters.
Happy if you want to fix this everywhere in a followup.
let mut res = TransportConfig::default(); | ||
|
||
let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap(); | ||
res.max_idle_timeout(Some(timeout)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having max idle only 2 * KEEP_ALIVE strikes me as wrong in case there's packet
loss. This is something we should measure.
macro_rules! display_send_transaction_stats_body { | ||
($self:ident, $f:ident, $($field:ident),* $(,)?) => { | ||
write!( | ||
$f, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's going on with the indentation in this macro? 😂
tpu-client-next/src/workers_cache.rs
Outdated
} | ||
} | ||
|
||
async fn send_txs(&self, txs_batch: TransactionBatch) -> Result<(), WorkersCacheError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: send_transactions. Be consistent with the names please, there's autistic
people among us.
}; | ||
|
||
/// Size of the channel to transmit transaction batches to the target workers. | ||
const WORKER_CHANNEL_SIZE: usize = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followup, but this (and possibly others) needs to be arguments to some call. The
client shouldn't make assumptions about at what rate txs are going to be produced.
lru = { workspace = true } | ||
quinn = { workspace = true } | ||
rustls = { workspace = true } | ||
solana-connection-cache = { workspace = true } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol, it sucks to have to depend on this for
enum Protocol {
TCP,
UDP
}
we should do something about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I think we should trim down all these deps as much as possible.
Break the cycle of violence!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is to have leader estimation service. A separate task on my roadmap to cover that component with tests, move it out from solana-connection-cache and give it some love in general.
peer: SocketAddr, | ||
transactions_receiver: mpsc::Receiver<TransactionBatch>, | ||
) -> (Self, CancellationToken) { | ||
let cancel = CancellationToken::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could have created a child token from the one received in scheduler and this way don't need to cancel explicitly in the shutdown. Not sure if worth the effort
This commit contains changes from the fork of agave repo used for testing. There, Illia updated tpu-client-next to use quinn v0.11 along with added cancelation token to gracefully cancel ongoing work without necessity to wait until recv will be checked.
c91d206
to
ca56c09
Compare
@alessandrod I've address most of the "easy" comments. Also fixed docs formatting here: ca56c09 Other comments I will address in the follow up. |
ca56c09
to
eb1fc76
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Can address rest of the feedback in follow-ups
Although tpu-client, component which is currently used for sending transactions over TPU, suited for bulk transaction sent, it was not designed for handling the stream of transactions. Additionally, the call stack for sending transactions using tpu-client has grown too deep, making the code difficult to maintain. This motivated us to create a new client implementation that is optimized for handling transaction streams and is built using Tokio from the start.
Problem
Although
tpu-client
, component which is currently used for sending transactions over TPU, suited for bulk transaction sent, it was not designed for handling the stream of transactions. Additionally, the call stack for sending transactions using tpu-client has grown too deep, making the code difficult to maintain.. This motivated us to create a new client implementation that is optimized for handling transaction streams and is built using Tokio from the start.Initially, this implementation was part of the
transaction-bench
crate in the Invalidator repository, which is a client application for sending transactions that execute a program with a specified number of accounts. We now plan to move the networking component of this implementation to the Agave repository, making it reusable for different client applications and accessible to other parties.The design was proposed by @alessandrod, who also used the
transaction-bench
tool for his testing activities. This component has been actively developed over the past several months and has undergone extensive review by @ilya-bobyr and @bw-solana. Documentation PRs were reviewed by @sam0x17.Test results using
transaction-bench
will be added soon.Summary of Changes
The main structure that is introduced in this PR is
ConnectionWorkersScheduler
.Most of other components are private.
Key Components:
ConnectionWorker
WorkerInfo
ConnectionWorker
(including the transaction sender and the handle to the worker’s task).ConnectionWorkersScheduler
WorkersCache
Workflow
ConnectionWorkersScheduler
receives transactions from channel receiver.WorkersCache
. If not, creates a new worker.ConnectionWorker
) handles a connection to a peer and processes transactions received through it's channel.Future work
There are some features that are not included in this PR:
512*PACKET_SIZE
and512
at most. These limits doesn't allow to use the full potential of the client. Will be added in a separate PR.