Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: dirty workaround for known reth mempool issues in v1.1.0 #6

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/node/events/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,8 @@ where
} else if let Some(latest_block) = this.state.latest_block {
let now =
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
if now - this.state.latest_block_time.unwrap_or(0) > 60 {
// https://github.com/paradigmxyz/reth/commit/523bfb9c81ad7c2493882d83d0c9a6d0bcb2ce52#diff-9b49eb32052d06f5011c679046579e019121ea44f698926d8c407a08c29fa570R507
if now.saturating_sub(this.state.latest_block_time.unwrap_or(0)) > 60 {
// Once we start receiving consensus nodes, don't emit status unless stalled for
// 1 minute
info!(
Expand Down
4 changes: 4 additions & 0 deletions crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,10 @@ where
self.pool.pending_transactions()
}

fn pending_transactions_max(&self, max: usize) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.pool.pending_transactions_max(max)
}

fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.pool.queued_transactions()
}
Expand Down
4 changes: 4 additions & 0 deletions crates/transaction-pool/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ impl TransactionPool for NoopTransactionPool {
vec![]
}

fn pending_transactions_max(&self, _max: usize) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
vec![]
}

fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
vec![]
}
Expand Down
2 changes: 2 additions & 0 deletions crates/transaction-pool/src/pool/best.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
self.add_new_transactions();
// Remove the next independent tx with the highest priority
let best = self.independent.pop_last()?;
// https://github.com/paradigmxyz/reth/issues/12340
self.all.remove(best.transaction.id());
let hash = best.transaction.hash();

// skip transactions that were marked as invalid
Expand Down
5 changes: 5 additions & 0 deletions crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,11 @@ where
self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
}

/// Returns all transactions from the pending sub-pool
pub(crate) fn pending_transactions_max(&self, max: usize) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.get_pool_data().pending_transactions_iter().take(max).collect()
}

/// Returns all transactions from the pending sub-pool
pub(crate) fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.get_pool_data().pending_transactions()
Expand Down
3 changes: 2 additions & 1 deletion crates/transaction-pool/src/pool/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ pub struct PendingPool<T: TransactionOrdering> {
impl<T: TransactionOrdering> PendingPool<T> {
/// Create a new pool instance.
pub fn new(ordering: T) -> Self {
let (new_transaction_notifier, _) = broadcast::channel(200);
// https://github.com/paradigmxyz/reth/issues/12336
let (new_transaction_notifier, _) = broadcast::channel(100000);
Self {
ordering,
submission_id: 0,
Expand Down
34 changes: 27 additions & 7 deletions crates/transaction-pool/src/pool/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,33 @@ impl<T: TransactionOrdering> TxPool<T> {

match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) {
Ok(InsertOk { transaction, move_to, replaced_tx, updates, .. }) => {
// replace the new tx and remove the replaced in the subpool(s)
self.add_new_transaction(transaction.clone(), replaced_tx.clone(), move_to);
// Update inserted transactions metric
self.metrics.inserted_transactions.increment(1);
let UpdateOutcome { promoted, discarded } = self.process_updates(updates);

let replaced = replaced_tx.map(|(tx, _)| tx);
// https://github.com/paradigmxyz/reth/issues/12286
let replaced = replaced_tx.clone().map(|(tx, _)| tx);

let (promoted, discarded) = if updates.is_empty() {
// 99% of the cases
self.add_new_transaction(transaction.clone(), replaced_tx, move_to);
self.metrics.inserted_transactions.increment(1);
(Vec::new(), Vec::new())
} else {
let (prev_updates, next_updates): (Vec<_>, Vec<_>) =
updates.into_iter().partition(|update| update.id < *transaction.id());
let prev_processed = self.process_updates(prev_updates);
self.add_new_transaction(transaction.clone(), replaced_tx, move_to);
self.metrics.inserted_transactions.increment(1);
let next_processed = self.process_updates(next_updates);
let promoted: Vec<_> = Iterator::chain(
prev_processed.promoted.into_iter(),
next_processed.promoted.into_iter(),
)
.collect();
let discarded: Vec<_> = Iterator::chain(
prev_processed.discarded.into_iter(),
next_processed.discarded.into_iter(),
)
.collect();
(promoted, discarded)
};

// This transaction was moved to the pending pool.
let res = if move_to.is_pending() {
Expand Down
6 changes: 6 additions & 0 deletions crates/transaction-pool/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ pub trait TransactionPool: Send + Sync + Clone {
/// Consumer: RPC
fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;

/// Returns first `max` transactions that can be included in the next block.
fn pending_transactions_max(
&self,
max: usize,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;

/// Returns all transactions that can be included in _future_ blocks.
///
/// This and [Self::pending_transactions] are mutually exclusive.
Expand Down
Loading