From d3c3412e4f64a0df4801e72876081c57ff76253d Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Mon, 28 Jun 2021 18:01:15 -0500 Subject: [PATCH] Banking thredas send batch of transactions to cost_model_thread right after committed to bank, avoiding cloning individual `transaction` when send. --- core/src/banking_stage.rs | 78 ++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 33 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 9a46aabdcfb0a7..a6128001f0b74b 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -263,6 +263,11 @@ pub enum BufferedPacketsDecision { Hold, } +pub struct CommittedTransactionBatch { + pub transactions: Vec, + pub execution_results: Vec, +} + impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. #[allow(clippy::new_ret_no_self)] @@ -341,8 +346,8 @@ impl BankingStage { // start cost model thread let (cost_update_sender, cost_update_receiver): ( - Sender, - Receiver, + Sender, + Receiver, ) = channel(); // TODO TAO - passing `exit` to this loop to allow the thread to end @@ -470,7 +475,7 @@ impl BankingStage { recorder: &TransactionRecorder, cost_model: &Arc>, cost_tracker: &Arc>, - cost_update_sender: Sender, + cost_update_sender: Sender, ) { let mut rebuffered_packets_len = 0; let mut new_tx_count = 0; @@ -621,7 +626,7 @@ impl BankingStage { recorder: &TransactionRecorder, cost_model: &Arc>, cost_tracker: &Arc>, - cost_update_sender: Sender, + cost_update_sender: Sender, ) -> BufferedPacketsDecision { let bank_start; let ( @@ -732,7 +737,7 @@ impl BankingStage { fn cost_update_loop( cost_model: &Arc>, cost_tracker: &Arc>, - cost_update_receiver: Receiver, + cost_update_receiver: Receiver, ) { // TODO TAO - sleep or hot spin? We dont want to delay counting TX too long // even it is not criitcal to track the cost at micro-sec @@ -746,15 +751,19 @@ impl BankingStage { } // */ - for transaction in cost_update_receiver.try_iter() { - cost_model - .read() - .unwrap() - .calculate_cost_no_alloc(&transaction, &mut tx_cost); - cost_tracker.write().unwrap().add_transaction( - &tx_cost.writable_accounts, - &(tx_cost.account_access_cost + tx_cost.execution_cost), - ); + for batch in cost_update_receiver.try_iter() { + for ((result, _), tx) in batch.execution_results.iter().zip(batch.transactions.iter()) { + if result.is_ok() { + cost_model + .read() + .unwrap() + .calculate_cost_no_alloc(&tx, &mut tx_cost); + cost_tracker.write().unwrap().add_transaction( + &tx_cost.writable_accounts, + &(tx_cost.account_access_cost + tx_cost.execution_cost), + ); + } + } } thread::sleep(wait_timer); @@ -776,7 +785,7 @@ impl BankingStage { duplicates: &Arc, PacketHasher)>>, cost_model: &Arc>, cost_tracker: &Arc>, - cost_update_sender: Sender, + cost_update_sender: Sender, ) { let recorder = poh_recorder.lock().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -919,6 +928,7 @@ impl BankingStage { batch: &TransactionBatch, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, + cost_tracker_update_sender: Sender, ) -> (Result, Vec) { let mut load_execute_time = Measure::start("load_execute_time"); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce @@ -1006,6 +1016,15 @@ impl BankingStage { tx_results.rent_debits, ); } + + // track committed transactions' cost + let transactions = batch.transactions_iter().cloned().collect(); + let execution_results = results.to_vec(); + cost_tracker_update_sender.send( + CommittedTransactionBatch { + transactions, + execution_results, + } ).expect("send committed transactions to update cost model"); } commit_time.stop(); @@ -1035,6 +1054,7 @@ impl BankingStage { chunk_offset: usize, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, + cost_tracker_update_sender: Sender, ) -> (Result, Vec) { let mut lock_time = Measure::start("lock_time"); // Once accounts are locked, other threads cannot encode transactions that will modify the @@ -1048,6 +1068,7 @@ impl BankingStage { &batch, transaction_status_sender, gossip_vote_sender, + cost_tracker_update_sender, ); retryable_txs.iter_mut().for_each(|x| *x += chunk_offset); @@ -1078,6 +1099,7 @@ impl BankingStage { poh: &TransactionRecorder, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, + cost_tracker_update_sender: Sender, ) -> (usize, Vec) { let mut chunk_start = 0; let mut unprocessed_txs = vec![]; @@ -1094,6 +1116,7 @@ impl BankingStage { chunk_start, transaction_status_sender.clone(), gossip_vote_sender, + cost_tracker_update_sender.clone(), ); trace!("process_transactions result: {:?}", result); @@ -1297,7 +1320,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, cost_model: &Arc>, cost_tracker: &Arc>, - cost_update_sender: Sender, + cost_update_sender: Sender, ) -> (usize, usize, Vec) { let mut packet_conversion_time = Measure::start("packet_conversion"); let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) = @@ -1332,6 +1355,7 @@ impl BankingStage { poh, transaction_status_sender, gossip_vote_sender, + cost_update_sender, ); process_tx_time.stop(); let unprocessed_tx_count = unprocessed_tx_indexes.len(); @@ -1340,32 +1364,20 @@ impl BankingStage { unprocessed_tx_count ); - // applying cost of processed transactions to shared cost_tracker let mut cost_tracking_time = Measure::start("cost_tracking_time"); - /* TODO TAO - replaced - let mut tx_cost = TransactionCost::new_with_capacity(MAX_WRITABLE_ACCOUNTS); - // */ + /* TODO TAO - doing in batch fashino right after committing, to + * save 1) nested for-loop, 2) individual transaction cloning + // applying cost of processed transactions to shared cost_tracker { - //let cost_model_readonly = cost_model.read().unwrap(); - //let mut cost_tracker_mutable = cost_tracker.write().unwrap(); transactions.iter().enumerate().for_each(|(index, tx)| { if !unprocessed_tx_indexes.iter().any(|&i| i == index) { cost_update_sender .send(tx.transaction().clone()) .expect("send transaction to cost_model"); - /* TODO TAO - replaced - cost_model - .read() - .unwrap() - .calculate_cost_no_alloc(tx.transaction(), &mut tx_cost); - cost_tracker.write().unwrap().add_transaction( - &tx_cost.writable_accounts, - &(tx_cost.account_access_cost + tx_cost.execution_cost), - ); - // */ } }); } + // */ cost_tracking_time.stop(); let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time"); @@ -1493,7 +1505,7 @@ impl BankingStage { recorder: &TransactionRecorder, cost_model: &Arc>, cost_tracker: &Arc>, - cost_update_sender: Sender, + cost_update_sender: Sender, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); let mms = verified_receiver.recv_timeout(recv_timeout)?;