From dacfa9e2e65030738bac3528e4b0a3ce83282df4 Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Tue, 28 Jan 2025 16:47:49 -0600 Subject: [PATCH 1/6] wip - benchmark Scheduler(s), use global Jemalloc to be in sync with real-thing; add Tracer and print Stats at end of benching. --- Cargo.lock | 2 + .../src/compute_budget_instruction_details.rs | 8 + core/Cargo.toml | 6 + core/benches/prio_graph_scheduler.rs | 421 ++++++++++++++++++ core/benches/scheduler_controller.rs | 14 + core/src/banking_stage.rs | 9 +- .../transaction_scheduler/mod.rs | 6 +- .../prio_graph_scheduler.rs | 10 +- .../transaction_priority_id.rs | 2 +- .../transaction_state.rs | 8 +- .../transaction_state_container.rs | 6 +- 11 files changed, 473 insertions(+), 19 deletions(-) create mode 100644 core/benches/prio_graph_scheduler.rs create mode 100644 core/benches/scheduler_controller.rs diff --git a/Cargo.lock b/Cargo.lock index 31e61fef7b5acb..9e8854d0eab2d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7214,6 +7214,7 @@ dependencies = [ "bs58", "bytes", "chrono", + "criterion", "crossbeam-channel", "dashmap", "etcd-client", @@ -7303,6 +7304,7 @@ dependencies = [ "tempfile", "test-case", "thiserror 2.0.11", + "tikv-jemallocator", "tokio", "trees", ] diff --git a/compute-budget-instruction/src/compute_budget_instruction_details.rs b/compute-budget-instruction/src/compute_budget_instruction_details.rs index 145172033d0dfb..8e2dbb1bc96820 100644 --- a/compute-budget-instruction/src/compute_budget_instruction_details.rs +++ b/compute-budget-instruction/src/compute_budget_instruction_details.rs @@ -50,6 +50,14 @@ pub struct ComputeBudgetInstructionDetails { migrating_builtin_feature_counters: MigrationBuiltinFeatureCounter, } +// add pub access to raw Cb fields +impl ComputeBudgetInstructionDetails { + pub fn requested_loaded_accounts_data_size_limit(&self) -> Option { + self.requested_loaded_accounts_data_size_limit + .map(|(_, v)| v) + } +} + impl ComputeBudgetInstructionDetails { pub fn try_from<'a>( instructions: impl Iterator)> + Clone, diff --git a/core/Cargo.toml b/core/Cargo.toml index b5337373e4f17a..c0e3ef2076ebc3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -108,7 +108,9 @@ tokio = { workspace = true, features = ["full"] } trees = { workspace = true } [dev-dependencies] +criterion = { workspace = true } fs_extra = { workspace = true } +jemallocator = { workspace = true } serde_json = { workspace = true } serial_test = { workspace = true } # See order-crates-for-publishing.py for using this unusual `path = "."` @@ -171,6 +173,10 @@ name = "gen_keys" [[bench]] name = "sigverify_stage" +[[bench]] +name = "prio_graph_scheduler" +harness = false + [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/core/benches/prio_graph_scheduler.rs b/core/benches/prio_graph_scheduler.rs new file mode 100644 index 00000000000000..89e0fbf9ddbde7 --- /dev/null +++ b/core/benches/prio_graph_scheduler.rs @@ -0,0 +1,421 @@ +use { + criterion::{black_box, criterion_group, criterion_main, Criterion}, + crossbeam_channel::{unbounded, Receiver, Sender}, + jemallocator::Jemalloc, + solana_core::banking_stage::{ + TOTAL_BUFFERED_PACKETS, + immutable_deserialized_packet::ImmutableDeserializedPacket, + scheduler_messages::{ConsumeWork, FinishedConsumeWork, MaxAge}, + transaction_scheduler::{ + prio_graph_scheduler::{PrioGraphScheduler, PrioGraphSchedulerConfig}, + transaction_state::SanitizedTransactionTTL, + transaction_state_container::{StateContainer, TransactionStateContainer}, + }, + }, + solana_runtime_transaction::{ + runtime_transaction::RuntimeTransaction, transaction_with_meta::TransactionWithMeta, + }, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + hash::Hash, + message::Message, + packet::Packet, + pubkey::Pubkey, + signature::Keypair, + signer::Signer, + system_instruction, + transaction::{SanitizedTransaction, Transaction}, + }, + std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; + +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +// A non-contend low-prio tx, aka Tracer, is tag with this requested_loaded_accounts_data_size_limit +const TAG_NUMBER: u32 = 1234; + +fn is_tracer(tx: &Tx) -> bool { + matches!( + tx.compute_budget_instruction_details() + .requested_loaded_accounts_data_size_limit(), + Some(TAG_NUMBER) + ) +} + +// TODO - the goal is to measure the performance, and observe the behavior/pattern, of Scheduler; +// - performance: time, througput +// - behavior: +// - how many time it loops container; +// - when non-contend low-pri is picked; +// - behavior/perform if block limits reached +// +// In order to do that, we need: +// - dummy worker that quickly drain channel to minimize pressure that can potentially impact +// Scheduler `send` works +// - identically prefilled container for each benck loops. + +// TODO - transaction factory, to build container scenarios +// - contending / competing TX with non-contend low prio tx at bottom +// - prio distribution doesn't matter since "insert" to container will sort them +fn build_non_contend_transactions(count: usize) -> Vec> { + let mut transactions = Vec::with_capacity(count); + // non-contend low-prio tx is first received + transactions.push(build_tracer_transaction()); + + let compute_unit_price = 1_000; + const MAX_TRANSFERS_PER_TX: usize = 58; + + for _n in 1..count { + let payer = Keypair::new(); + let to_pubkey = Pubkey::new_unique(); + let mut ixs = system_instruction::transfer_many( + &payer.pubkey(), + &vec![(to_pubkey, 1); MAX_TRANSFERS_PER_TX], + ); + let prioritization = ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price); + ixs.push(prioritization); + let message = Message::new(&ixs, Some(&payer.pubkey())); + let tx = Transaction::new(&[payer], message, Hash::default()); + let transaction = RuntimeTransaction::from_transaction_for_tests(tx); + + transactions.push(transaction); + } + +println!("prep non-contend txs: {:?}", transactions.len()); + + transactions +} + +fn build_fully_contend_transactions(count: usize) -> Vec> { + let mut transactions = Vec::with_capacity(count); + // non-contend low-prio tx is first received + transactions.push(build_tracer_transaction()); + + let compute_unit_price = 1_000; + const MAX_TRANSFERS_PER_TX: usize = 58; + + let to_pubkey = Pubkey::new_unique(); + for _n in 1..count { + let payer = Keypair::new(); + let mut ixs = system_instruction::transfer_many( + &payer.pubkey().clone(), + &vec![(to_pubkey, 1); MAX_TRANSFERS_PER_TX], + ); + let prioritization = ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price); + ixs.push(prioritization); + let message = Message::new(&ixs, Some(&payer.pubkey())); + let tx = Transaction::new(&[payer], message, Hash::default()); + let transaction = RuntimeTransaction::from_transaction_for_tests(tx); + + transactions.push(transaction); + } + +println!("prep full-contend txs: {:?}", transactions.len()); + + transactions +} + +// Tracer is a non-contend low-prio transfer transaction, it'd usually be inserted into the bottom +// of Container ddue to its low prio, but it should be scheduled early since it is non-contend for +// better UX. +fn build_tracer_transaction() -> RuntimeTransaction { + let payer = Keypair::new(); + let to_pubkey = Pubkey::new_unique(); + let mut ixs = vec![system_instruction::transfer(&payer.pubkey(), &to_pubkey, 1)]; + ixs.push(ComputeBudgetInstruction::set_compute_unit_price(4)); + ixs.push(ComputeBudgetInstruction::set_loaded_accounts_data_size_limit(TAG_NUMBER)); + let message = Message::new(&ixs, Some(&payer.pubkey())); + let tx = Transaction::new(&[payer], message, Hash::default()); + RuntimeTransaction::from_transaction_for_tests(tx) +} + +struct BenchContainer { + container: TransactionStateContainer, +} + +impl BenchContainer { + fn new(capacity: usize) -> Self { + Self { + container: TransactionStateContainer::with_capacity(capacity), + } + } + + fn fill_container(&mut self, transactions: impl Iterator) { + let mut n: usize = 0; + for transaction in transactions { + let compute_unit_price = transaction + .compute_budget_instruction_details() + .sanitize_and_convert_to_compute_budget_limits( + &solana_feature_set::FeatureSet::default(), + ) + .unwrap() + .compute_unit_price; + + let packet = Arc::new( + ImmutableDeserializedPacket::new( + Packet::from_data(None, transaction.to_versioned_transaction()).unwrap(), + ) + .unwrap(), + ); + let transaction_ttl = SanitizedTransactionTTL { + transaction, + max_age: MaxAge::MAX, + }; + // NOTE - setting transaction cost to be `0` for now, so it doesn't bother block_limits + // when scheduling. + const TEST_TRANSACTION_COST: u64 = 0; + if self.container.insert_new_transaction( + transaction_ttl, + packet, + compute_unit_price, + TEST_TRANSACTION_COST, + ) { + assert!(false); + println!("fail fill container: remaining cap {:?}, nth {:?}", self.container.remaining_capacity(), n); + } + n += 1; + } + println!("==== inserted {} transactions to container ====", n); + } +} + +#[derive(Debug, Default)] +struct BenchStats { + bench_iter_count: usize, + num_of_scheduling: usize, + // worker reports: + num_works: Arc, + num_transaction: Arc, // = bench_iter_count * container_capacity + tracer_placement: Arc, // > 0 + // from scheduler().result: + num_scheduled: usize, // = num_transaction +} + +// a bench consumer worker that quickly drain work channel, then send a OK back via completed-work +// channel +struct PingPong { + threads: Vec>, +} + +impl PingPong { + fn new( + work_receivers: Vec>>, + completed_work_sender: Sender>, + num_works: Arc, + num_transaction: Arc, + tracer_placement: Arc, + ) -> Self { + let mut threads = Vec::with_capacity(work_receivers.len()); + + for receiver in work_receivers { + let completed_work_sender_clone = completed_work_sender.clone(); + let num_works_clone = num_works.clone(); + let num_transaction_clone = num_transaction.clone(); + let tracer_placement_clone = tracer_placement.clone(); + + let handle = std::thread::spawn(move || { + Self::service_loop( + receiver, + completed_work_sender_clone, + num_works_clone, + num_transaction_clone, + tracer_placement_clone, + ); + }); + threads.push(handle); + } + + Self { threads } + } + + fn service_loop( + work_receiver: Receiver>, + completed_work_sender: Sender>, + num_works: Arc, + num_transaction: Arc, + tracer_placement: Arc, + ) { + // NOTE: will blocking recv() impact benchmark quality? Perhaps making worker threads + // hot spinning? + while let Ok(work) = work_receiver.recv() { + num_works.fetch_add(1, Ordering::Relaxed); + let mut tx_count = + num_transaction.fetch_add(work.transactions.len(), Ordering::Relaxed); + if tracer_placement.load(Ordering::Relaxed) == 0 { + work.transactions.iter().for_each(|tx| { + tx_count += 1; + if is_tracer(tx) { + println!("==== tracer found! {:?}, {:?}, {:?}", num_works.load(Ordering::Relaxed), tx_count, num_transaction.load(Ordering::Relaxed)); + tracer_placement.store(tx_count, Ordering::Relaxed) + } + }); + } + + if completed_work_sender + .send(FinishedConsumeWork { + work, + retryable_indexes: vec![], + }) + .is_err() + { + // kill this worker if finished_work channel is broken + break; + } + } + } + + fn join(self) { + for thread in self.threads { + thread.join().unwrap(); + } + } +} + +// setup Scheduler with bench accessories: pingpong worker, filters and status +struct BenchSetup { + scheduler: PrioGraphScheduler, + pingpong_worker: PingPong, + stats: BenchStats, + filter_1: fn(&[&Tx], &mut [bool]), + filter_2: fn(&Tx) -> bool, +} + +impl BenchSetup { + fn new() -> Self { + let stats = BenchStats::default(); + + let num_workers = 4; + + let (consume_work_senders, consume_work_receivers) = + (0..num_workers).map(|_| unbounded()).unzip(); + let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); + let scheduler = PrioGraphScheduler::new( + consume_work_senders, + finished_consume_work_receiver, + PrioGraphSchedulerConfig::default(), + ); + let pingpong_worker = PingPong::new( + consume_work_receivers, + finished_consume_work_sender, + stats.num_works.clone(), + stats.num_transaction.clone(), + stats.tracer_placement.clone(), + ); + + Self { + scheduler, + pingpong_worker, + stats, + filter_1: Self::test_pre_graph_filter, + filter_2: Self::test_pre_lock_filter, + } + } + + fn test_pre_graph_filter(_txs: &[&Tx], results: &mut [bool]) { + results.fill(true); + } + + fn test_pre_lock_filter(_tx: &Tx) -> bool { + true + } + + fn run(&mut self, mut container: TransactionStateContainer) { + // each bench measurement is to schedule everything in the container + while !container.is_empty() { + let result = self + .scheduler + .schedule(&mut container, self.filter_1, self.filter_2) + .unwrap(); + + // do some VERY QUICK stats collecting to print/assert at end of bench + self.stats.num_of_scheduling += 1; + self.stats.num_scheduled += result.num_scheduled; + } + + self.stats.bench_iter_count += 1; + } + + fn print_stats(self) { + drop(self.scheduler); + self.pingpong_worker.join(); + println!("{:?}", self.stats); + } +} + +fn bench_empty_container(c: &mut Criterion) { + let mut bench_setup: BenchSetup> = + BenchSetup::new(); + + c.benchmark_group("bench_empty_container") + .bench_function("sdk_transaction_type", |bencher| { + bencher.iter_with_setup( + || { + let bench_container = BenchContainer::new(0); + bench_container.container + }, + |container| { + black_box(bench_setup.run(container)); + }, + ) + }); + + bench_setup.print_stats(); +} + +fn bench_non_contend_transactions(c: &mut Criterion) { + let capacity = TOTAL_BUFFERED_PACKETS; + let mut bench_setup: BenchSetup> = BenchSetup::new(); + + c.benchmark_group("bench_non_contend_transactions") + .sample_size(10) + .bench_function("sdk_transaction_type", |bencher| { + bencher.iter_with_setup( + || { + let mut bench_container = BenchContainer::new(capacity); + bench_container + .fill_container(build_non_contend_transactions(capacity).into_iter()); + bench_container.container + }, + |container| { + black_box(bench_setup.run(container)); + }, + ) + }); + + bench_setup.print_stats(); +} + +fn bench_fully_contend_transactions(c: &mut Criterion) { + let capacity = 10000; //TOTAL_BUFFERED_PACKETS; + let mut bench_setup: BenchSetup> = BenchSetup::new(); + + c.benchmark_group("bench_fully_contend_transactions") + .sample_size(10) + .bench_function("sdk_transaction_type", |bencher| { + bencher.iter_with_setup( + || { + let mut bench_container = BenchContainer::new(capacity); + bench_container + .fill_container(build_fully_contend_transactions(capacity).into_iter()); + bench_container.container + }, + |container| { + black_box(bench_setup.run(container)); + }, + ) + }); + + bench_setup.print_stats(); +} + +criterion_group!( + benches, +// bench_empty_container, +// bench_non_contend_transactions, + bench_fully_contend_transactions, +); +criterion_main!(benches); diff --git a/core/benches/scheduler_controller.rs b/core/benches/scheduler_controller.rs new file mode 100644 index 00000000000000..f0a84509096d81 --- /dev/null +++ b/core/benches/scheduler_controller.rs @@ -0,0 +1,14 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; + +fn bench_dummy(c: &mut Criterion) { + c.benchmark_group("bench_dummy") + .throughput(Throughput::Elements(1)) + .bench_function("dummy", |bencher| { + bencher.iter(|| { + std::thread::sleep(std::time::Duration::from_millis(100)); + }); + }); +} + +criterion_group!(benches, bench_dummy); +criterion_main!(benches); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 14176c17cb65a5..3c37d0a89221ac 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -66,10 +66,13 @@ pub mod leader_slot_metrics; pub mod qos_service; pub mod unprocessed_packet_batches; pub mod unprocessed_transaction_storage; +// made public for benching scheduler +pub mod scheduler_messages; +pub mod transaction_scheduler; mod consume_worker; mod decision_maker; -mod immutable_deserialized_packet; +pub mod immutable_deserialized_packet; mod latest_unprocessed_votes; mod leader_slot_timing_metrics; mod multi_iterator_scanner; @@ -77,8 +80,6 @@ mod packet_deserializer; mod packet_filter; mod packet_receiver; mod read_write_account_set; -mod scheduler_messages; -mod transaction_scheduler; // proc_macro_hygiene needs to be stabilzied to use qualifier_attr... // error[E0658]: non-inline modules in proc macro input are unstable @@ -90,7 +91,7 @@ pub mod unified_scheduler; // Fixed thread size seems to be fastest on GCP setup pub const NUM_THREADS: u32 = 6; -const TOTAL_BUFFERED_PACKETS: usize = 100_000; +pub const TOTAL_BUFFERED_PACKETS: usize = 100_000; const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; diff --git a/core/src/banking_stage/transaction_scheduler/mod.rs b/core/src/banking_stage/transaction_scheduler/mod.rs index c28e3110cf7033..bcf9d971babb85 100644 --- a/core/src/banking_stage/transaction_scheduler/mod.rs +++ b/core/src/banking_stage/transaction_scheduler/mod.rs @@ -1,7 +1,7 @@ mod batch_id_generator; pub(crate) mod greedy_scheduler; mod in_flight_tracker; -pub(crate) mod prio_graph_scheduler; +pub mod prio_graph_scheduler; pub(crate) mod receive_and_buffer; pub(crate) mod scheduler; pub(crate) mod scheduler_controller; @@ -9,5 +9,5 @@ pub(crate) mod scheduler_error; mod scheduler_metrics; mod thread_aware_account_locks; mod transaction_priority_id; -mod transaction_state; -pub(crate) mod transaction_state_container; +pub mod transaction_state; +pub mod transaction_state_container; diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 8e2090b4289c62..fa67c2843ffc81 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -42,7 +42,7 @@ type SchedulerPrioGraph = PrioGraph< fn(&TransactionPriorityId, &GraphNode) -> TransactionPriorityId, >; -pub(crate) struct PrioGraphSchedulerConfig { +pub struct PrioGraphSchedulerConfig { pub max_scheduled_cus: u64, pub max_scanned_transactions_per_scheduling_pass: usize, pub look_ahead_window_size: usize, @@ -60,7 +60,7 @@ impl Default for PrioGraphSchedulerConfig { } } -pub(crate) struct PrioGraphScheduler { +pub struct PrioGraphScheduler { in_flight_tracker: InFlightTracker, account_locks: ThreadAwareAccountLocks, consume_work_senders: Vec>>, @@ -70,7 +70,7 @@ pub(crate) struct PrioGraphScheduler { } impl PrioGraphScheduler { - pub(crate) fn new( + pub fn new( consume_work_senders: Vec>>, finished_consume_work_receiver: Receiver>, config: PrioGraphSchedulerConfig, @@ -104,7 +104,7 @@ impl Scheduler for PrioGraphScheduler { /// This, combined with internal tracking of threads' in-flight transactions, allows /// for load-balancing while prioritizing scheduling transactions onto threads that will /// not cause conflicts in the near future. - fn schedule>( + pub fn schedule>( &mut self, container: &mut S, pre_graph_filter: impl Fn(&[&Tx], &mut [bool]), @@ -297,6 +297,8 @@ impl Scheduler for PrioGraphScheduler { // Send batches for any remaining transactions saturating_add_assign!(num_sent, self.send_batches(&mut batches)?); +println!("schedule finished; batches sent {}, unscheduleable_ids: {:?}", num_sent, unschedulable_ids); + // Push unschedulable ids back into the container container.push_ids_into_queue(unschedulable_ids.into_iter()); diff --git a/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs b/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs index 7142f3ac09736d..2da8e6b7dbd47f 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs @@ -6,7 +6,7 @@ use { /// A unique identifier tied with priority ordering for a transaction/packet: #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub(crate) struct TransactionPriorityId { +pub struct TransactionPriorityId { pub(crate) priority: u64, pub(crate) id: TransactionId, } diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state.rs b/core/src/banking_stage/transaction_scheduler/transaction_state.rs index 5311e22c45c3f3..5176c63aaa513a 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state.rs @@ -1,9 +1,9 @@ use crate::banking_stage::scheduler_messages::MaxAge; /// Simple wrapper type to tie a sanitized transaction to max age slot. -pub(crate) struct SanitizedTransactionTTL { - pub(crate) transaction: Tx, - pub(crate) max_age: MaxAge, +pub struct SanitizedTransactionTTL { + pub transaction: Tx, + pub max_age: MaxAge, } /// TransactionState is used to track the state of a transaction in the transaction scheduler @@ -26,7 +26,7 @@ pub(crate) struct SanitizedTransactionTTL { /// to the appropriate thread for processing. This is done to avoid cloning the /// `SanitizedTransaction`. #[allow(clippy::large_enum_variant)] -pub(crate) enum TransactionState { +pub enum TransactionState { /// The transaction is available for scheduling. Unprocessed { transaction_ttl: SanitizedTransactionTTL, diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index a11ffb4da97757..1a8408adbb9891 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -40,13 +40,13 @@ use { /// /// The container maintains a fixed capacity. If the queue is full when pushing /// a new transaction, the lowest priority transaction will be dropped. -pub(crate) struct TransactionStateContainer { +pub struct TransactionStateContainer { capacity: usize, priority_queue: MinMaxHeap, id_to_transaction_state: Slab>, } -pub(crate) trait StateContainer { +pub trait StateContainer { /// Create a new `TransactionStateContainer` with the given capacity. fn with_capacity(capacity: usize) -> Self; @@ -173,7 +173,7 @@ impl StateContainer for TransactionStateContainer TransactionStateContainer { /// Insert a new transaction into the container's queues and maps. /// Returns `true` if a packet was dropped due to capacity limits. - pub(crate) fn insert_new_transaction( + pub fn insert_new_transaction( &mut self, transaction_ttl: SanitizedTransactionTTL, priority: u64, From 084032b735eee777a762d56c8c59eed4802affe4 Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Tue, 18 Feb 2025 11:40:25 -0600 Subject: [PATCH 2/6] wip - cleanup after rebase, benches run --- core/benches/prio_graph_scheduler.rs | 13 ++++--------- core/src/banking_stage/transaction_scheduler/mod.rs | 6 +++--- .../transaction_scheduler/prio_graph_scheduler.rs | 4 +--- .../transaction_scheduler/scheduler.rs | 4 ++-- 4 files changed, 10 insertions(+), 17 deletions(-) diff --git a/core/benches/prio_graph_scheduler.rs b/core/benches/prio_graph_scheduler.rs index 89e0fbf9ddbde7..a41f58c105dc5b 100644 --- a/core/benches/prio_graph_scheduler.rs +++ b/core/benches/prio_graph_scheduler.rs @@ -8,6 +8,7 @@ use { scheduler_messages::{ConsumeWork, FinishedConsumeWork, MaxAge}, transaction_scheduler::{ prio_graph_scheduler::{PrioGraphScheduler, PrioGraphSchedulerConfig}, + scheduler::Scheduler, transaction_state::SanitizedTransactionTTL, transaction_state_container::{StateContainer, TransactionStateContainer}, }, @@ -85,8 +86,6 @@ fn build_non_contend_transactions(count: usize) -> Vec Vec BenchContainer { compute_unit_price, TEST_TRANSACTION_COST, ) { - assert!(false); - println!("fail fill container: remaining cap {:?}, nth {:?}", self.container.remaining_capacity(), n); + unreachable!("test is setup to fill the Container to fullness"); } n += 1; } - println!("==== inserted {} transactions to container ====", n); } } @@ -414,8 +409,8 @@ fn bench_fully_contend_transactions(c: &mut Criterion) { criterion_group!( benches, -// bench_empty_container, -// bench_non_contend_transactions, + bench_empty_container, + bench_non_contend_transactions, bench_fully_contend_transactions, ); criterion_main!(benches); diff --git a/core/src/banking_stage/transaction_scheduler/mod.rs b/core/src/banking_stage/transaction_scheduler/mod.rs index bcf9d971babb85..1fa7d176ad403d 100644 --- a/core/src/banking_stage/transaction_scheduler/mod.rs +++ b/core/src/banking_stage/transaction_scheduler/mod.rs @@ -1,9 +1,9 @@ mod batch_id_generator; -pub(crate) mod greedy_scheduler; +pub mod greedy_scheduler; mod in_flight_tracker; pub mod prio_graph_scheduler; -pub(crate) mod receive_and_buffer; -pub(crate) mod scheduler; +pub mod receive_and_buffer; +pub mod scheduler; pub(crate) mod scheduler_controller; pub(crate) mod scheduler_error; mod scheduler_metrics; diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index fa67c2843ffc81..26bfb07fac1d80 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -104,7 +104,7 @@ impl Scheduler for PrioGraphScheduler { /// This, combined with internal tracking of threads' in-flight transactions, allows /// for load-balancing while prioritizing scheduling transactions onto threads that will /// not cause conflicts in the near future. - pub fn schedule>( + fn schedule>( &mut self, container: &mut S, pre_graph_filter: impl Fn(&[&Tx], &mut [bool]), @@ -297,8 +297,6 @@ impl Scheduler for PrioGraphScheduler { // Send batches for any remaining transactions saturating_add_assign!(num_sent, self.send_batches(&mut batches)?); -println!("schedule finished; batches sent {}, unscheduleable_ids: {:?}", num_sent, unschedulable_ids); - // Push unschedulable ids back into the container container.push_ids_into_queue(unschedulable_ids.into_iter()); diff --git a/core/src/banking_stage/transaction_scheduler/scheduler.rs b/core/src/banking_stage/transaction_scheduler/scheduler.rs index 0aacb68630df2a..715393406083fb 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler.rs @@ -3,7 +3,7 @@ use { solana_runtime_transaction::transaction_with_meta::TransactionWithMeta, }; -pub(crate) trait Scheduler { +pub trait Scheduler { /// Schedule transactions from `container`. /// pre-graph and pre-lock filters may be passed to be applied /// before specific actions internally. @@ -24,7 +24,7 @@ pub(crate) trait Scheduler { /// Metrics from scheduling transactions. #[derive(Default, Debug, PartialEq, Eq)] -pub(crate) struct SchedulingSummary { +pub struct SchedulingSummary { /// Number of transactions scheduled. pub num_scheduled: usize, /// Number of transactions that were not scheduled due to conflicts. From c90e49eb6951330410979c425025e54cc72ce0f7 Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Tue, 18 Feb 2025 16:18:10 -0600 Subject: [PATCH 3/6] wip - done with prio --- core/benches/prio_graph_scheduler.rs | 130 +++++++++++++++------------ 1 file changed, 73 insertions(+), 57 deletions(-) diff --git a/core/benches/prio_graph_scheduler.rs b/core/benches/prio_graph_scheduler.rs index a41f58c105dc5b..de7f2cce4fc8dc 100644 --- a/core/benches/prio_graph_scheduler.rs +++ b/core/benches/prio_graph_scheduler.rs @@ -3,7 +3,6 @@ use { crossbeam_channel::{unbounded, Receiver, Sender}, jemallocator::Jemalloc, solana_core::banking_stage::{ - TOTAL_BUFFERED_PACKETS, immutable_deserialized_packet::ImmutableDeserializedPacket, scheduler_messages::{ConsumeWork, FinishedConsumeWork, MaxAge}, transaction_scheduler::{ @@ -12,6 +11,7 @@ use { transaction_state::SanitizedTransactionTTL, transaction_state_container::{StateContainer, TransactionStateContainer}, }, + TOTAL_BUFFERED_PACKETS, }, solana_runtime_transaction::{ runtime_transaction::RuntimeTransaction, transaction_with_meta::TransactionWithMeta, @@ -47,7 +47,8 @@ fn is_tracer(tx: &Tx) -> bool { ) } -// TODO - the goal is to measure the performance, and observe the behavior/pattern, of Scheduler; +// TODO - the goal is to measure and compare performance between different Schedulers, +// and observe the behavior/pattern of Scheduler; // - performance: time, througput // - behavior: // - how many time it loops container; @@ -142,7 +143,6 @@ impl BenchContainer { } fn fill_container(&mut self, transactions: impl Iterator) { - let mut n: usize = 0; for transaction in transactions { let compute_unit_price = transaction .compute_budget_instruction_details() @@ -173,7 +173,6 @@ impl BenchContainer { ) { unreachable!("test is setup to fill the Container to fullness"); } - n += 1; } } } @@ -187,11 +186,26 @@ struct BenchStats { num_transaction: Arc, // = bench_iter_count * container_capacity tracer_placement: Arc, // > 0 // from scheduler().result: - num_scheduled: usize, // = num_transaction + num_scheduled: usize, // = num_transaction +} + +impl BenchStats { + fn print_and_reset(&mut self) { + println!("{:?}", self); + self.num_works.swap(0, Ordering::Relaxed); + self.num_transaction.swap(0, Ordering::Relaxed); + self.tracer_placement.swap(0, Ordering::Relaxed); + self.bench_iter_count = 0; + self.num_of_scheduling = 0; + self.num_scheduled = 0; + } } // a bench consumer worker that quickly drain work channel, then send a OK back via completed-work // channel +// NOTE: Avoid creating PingPong within bench iter since joining threads at its eol would +// introducing variance to bench timing. +#[allow(dead_code)] struct PingPong { threads: Vec>, } @@ -244,7 +258,6 @@ impl PingPong { work.transactions.iter().for_each(|tx| { tx_count += 1; if is_tracer(tx) { - println!("==== tracer found! {:?}, {:?}, {:?}", num_works.load(Ordering::Relaxed), tx_count, num_transaction.load(Ordering::Relaxed)); tracer_placement.store(tx_count, Ordering::Relaxed) } }); @@ -262,37 +275,23 @@ impl PingPong { } } } - - fn join(self) { - for thread in self.threads { - thread.join().unwrap(); - } - } } -// setup Scheduler with bench accessories: pingpong worker, filters and status -struct BenchSetup { - scheduler: PrioGraphScheduler, +struct BenchEnv { pingpong_worker: PingPong, - stats: BenchStats, filter_1: fn(&[&Tx], &mut [bool]), filter_2: fn(&Tx) -> bool, + consume_work_senders: Vec>>, + finished_consume_work_receiver: Receiver>, } -impl BenchSetup { - fn new() -> Self { - let stats = BenchStats::default(); - +impl BenchEnv { + fn new(stats: &mut BenchStats) -> Self { let num_workers = 4; let (consume_work_senders, consume_work_receivers) = (0..num_workers).map(|_| unbounded()).unzip(); let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); - let scheduler = PrioGraphScheduler::new( - consume_work_senders, - finished_consume_work_receiver, - PrioGraphSchedulerConfig::default(), - ); let pingpong_worker = PingPong::new( consume_work_receivers, finished_consume_work_sender, @@ -302,11 +301,11 @@ impl BenchSetup { ); Self { - scheduler, pingpong_worker, - stats, filter_1: Self::test_pre_graph_filter, filter_2: Self::test_pre_lock_filter, + consume_work_senders, + finished_consume_work_receiver, } } @@ -318,52 +317,56 @@ impl BenchSetup { true } - fn run(&mut self, mut container: TransactionStateContainer) { + fn run( + &self, + mut scheduler: impl Scheduler, + mut container: TransactionStateContainer, + stats: &mut BenchStats, + ) { // each bench measurement is to schedule everything in the container while !container.is_empty() { - let result = self - .scheduler + let result = scheduler .schedule(&mut container, self.filter_1, self.filter_2) .unwrap(); // do some VERY QUICK stats collecting to print/assert at end of bench - self.stats.num_of_scheduling += 1; - self.stats.num_scheduled += result.num_scheduled; + stats.num_of_scheduling += 1; + stats.num_scheduled += result.num_scheduled; } - self.stats.bench_iter_count += 1; - } - - fn print_stats(self) { - drop(self.scheduler); - self.pingpong_worker.join(); - println!("{:?}", self.stats); + stats.bench_iter_count += 1; } } fn bench_empty_container(c: &mut Criterion) { - let mut bench_setup: BenchSetup> = - BenchSetup::new(); + let mut stats = BenchStats::default(); + let bench_env: BenchEnv> = BenchEnv::new(&mut stats); c.benchmark_group("bench_empty_container") .bench_function("sdk_transaction_type", |bencher| { bencher.iter_with_setup( || { let bench_container = BenchContainer::new(0); - bench_container.container + let scheduler = PrioGraphScheduler::new( + bench_env.consume_work_senders.clone(), + bench_env.finished_consume_work_receiver.clone(), + PrioGraphSchedulerConfig::default(), + ); + (scheduler, bench_container.container) }, - |container| { - black_box(bench_setup.run(container)); + |(scheduler, container)| { + black_box(bench_env.run(scheduler, container, &mut stats)); + //stats.print_and_reset(); }, ) }); - - bench_setup.print_stats(); + stats.print_and_reset(); } fn bench_non_contend_transactions(c: &mut Criterion) { let capacity = TOTAL_BUFFERED_PACKETS; - let mut bench_setup: BenchSetup> = BenchSetup::new(); + let mut stats = BenchStats::default(); + let bench_env: BenchEnv> = BenchEnv::new(&mut stats); c.benchmark_group("bench_non_contend_transactions") .sample_size(10) @@ -373,20 +376,27 @@ fn bench_non_contend_transactions(c: &mut Criterion) { let mut bench_container = BenchContainer::new(capacity); bench_container .fill_container(build_non_contend_transactions(capacity).into_iter()); - bench_container.container + let scheduler = PrioGraphScheduler::new( + bench_env.consume_work_senders.clone(), + bench_env.finished_consume_work_receiver.clone(), + PrioGraphSchedulerConfig::default(), + ); + (scheduler, bench_container.container) }, - |container| { - black_box(bench_setup.run(container)); + |(scheduler, container)| { + black_box(bench_env.run(scheduler, container, &mut stats)); + //stats.print_and_reset(); }, ) }); - bench_setup.print_stats(); + stats.print_and_reset(); } fn bench_fully_contend_transactions(c: &mut Criterion) { - let capacity = 10000; //TOTAL_BUFFERED_PACKETS; - let mut bench_setup: BenchSetup> = BenchSetup::new(); + let capacity = TOTAL_BUFFERED_PACKETS; + let mut stats = BenchStats::default(); + let bench_env: BenchEnv> = BenchEnv::new(&mut stats); c.benchmark_group("bench_fully_contend_transactions") .sample_size(10) @@ -396,15 +406,21 @@ fn bench_fully_contend_transactions(c: &mut Criterion) { let mut bench_container = BenchContainer::new(capacity); bench_container .fill_container(build_fully_contend_transactions(capacity).into_iter()); - bench_container.container + let scheduler = PrioGraphScheduler::new( + bench_env.consume_work_senders.clone(), + bench_env.finished_consume_work_receiver.clone(), + PrioGraphSchedulerConfig::default(), + ); + (scheduler, bench_container.container) }, - |container| { - black_box(bench_setup.run(container)); + |(scheduler, container)| { + black_box(bench_env.run(scheduler, container, &mut stats)); + //stats.print_and_reset(); }, ) }); - bench_setup.print_stats(); + stats.print_and_reset(); } criterion_group!( From 6db9a4bb287117d0eca1b862fcdf6db722395f0a Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Tue, 18 Feb 2025 16:34:28 -0600 Subject: [PATCH 4/6] add bench for greedy scheduler --- core/benches/prio_graph_scheduler.rs | 75 +++++++++++++++++-- .../transaction_scheduler/greedy_scheduler.rs | 4 +- 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/core/benches/prio_graph_scheduler.rs b/core/benches/prio_graph_scheduler.rs index de7f2cce4fc8dc..1219868f36d90b 100644 --- a/core/benches/prio_graph_scheduler.rs +++ b/core/benches/prio_graph_scheduler.rs @@ -6,6 +6,7 @@ use { immutable_deserialized_packet::ImmutableDeserializedPacket, scheduler_messages::{ConsumeWork, FinishedConsumeWork, MaxAge}, transaction_scheduler::{ + greedy_scheduler::{GreedyScheduler, GreedySchedulerConfig}, prio_graph_scheduler::{PrioGraphScheduler, PrioGraphSchedulerConfig}, scheduler::Scheduler, transaction_state::SanitizedTransactionTTL, @@ -363,12 +364,12 @@ fn bench_empty_container(c: &mut Criterion) { stats.print_and_reset(); } -fn bench_non_contend_transactions(c: &mut Criterion) { +fn bench_prio_graph_non_contend_transactions(c: &mut Criterion) { let capacity = TOTAL_BUFFERED_PACKETS; let mut stats = BenchStats::default(); let bench_env: BenchEnv> = BenchEnv::new(&mut stats); - c.benchmark_group("bench_non_contend_transactions") + c.benchmark_group("bench_prio_graph_non_contend_transactions") .sample_size(10) .bench_function("sdk_transaction_type", |bencher| { bencher.iter_with_setup( @@ -393,12 +394,12 @@ fn bench_non_contend_transactions(c: &mut Criterion) { stats.print_and_reset(); } -fn bench_fully_contend_transactions(c: &mut Criterion) { +fn bench_prio_graph_fully_contend_transactions(c: &mut Criterion) { let capacity = TOTAL_BUFFERED_PACKETS; let mut stats = BenchStats::default(); let bench_env: BenchEnv> = BenchEnv::new(&mut stats); - c.benchmark_group("bench_fully_contend_transactions") + c.benchmark_group("bench_prio_graph_fully_contend_transactions") .sample_size(10) .bench_function("sdk_transaction_type", |bencher| { bencher.iter_with_setup( @@ -423,10 +424,72 @@ fn bench_fully_contend_transactions(c: &mut Criterion) { stats.print_and_reset(); } +fn bench_greedy_non_contend_transactions(c: &mut Criterion) { + let capacity = TOTAL_BUFFERED_PACKETS; + let mut stats = BenchStats::default(); + let bench_env: BenchEnv> = BenchEnv::new(&mut stats); + + c.benchmark_group("bench_greedy_non_contend_transactions") + .sample_size(10) + .bench_function("sdk_transaction_type", |bencher| { + bencher.iter_with_setup( + || { + let mut bench_container = BenchContainer::new(capacity); + bench_container + .fill_container(build_non_contend_transactions(capacity).into_iter()); + let scheduler = GreedyScheduler::new( + bench_env.consume_work_senders.clone(), + bench_env.finished_consume_work_receiver.clone(), + GreedySchedulerConfig::default(), + ); + (scheduler, bench_container.container) + }, + |(scheduler, container)| { + black_box(bench_env.run(scheduler, container, &mut stats)); + //stats.print_and_reset(); + }, + ) + }); + + stats.print_and_reset(); +} + +fn bench_greedy_fully_contend_transactions(c: &mut Criterion) { + let capacity = TOTAL_BUFFERED_PACKETS; + let mut stats = BenchStats::default(); + let bench_env: BenchEnv> = BenchEnv::new(&mut stats); + + c.benchmark_group("bench_greedy_fully_contend_transactions") + .sample_size(10) + .bench_function("sdk_transaction_type", |bencher| { + bencher.iter_with_setup( + || { + let mut bench_container = BenchContainer::new(capacity); + bench_container + .fill_container(build_fully_contend_transactions(capacity).into_iter()); + let scheduler = GreedyScheduler::new( + bench_env.consume_work_senders.clone(), + bench_env.finished_consume_work_receiver.clone(), + GreedySchedulerConfig::default(), + ); + (scheduler, bench_container.container) + }, + |(scheduler, container)| { + black_box(bench_env.run(scheduler, container, &mut stats)); + //stats.print_and_reset(); + }, + ) + }); + + stats.print_and_reset(); +} + criterion_group!( benches, bench_empty_container, - bench_non_contend_transactions, - bench_fully_contend_transactions, + bench_prio_graph_non_contend_transactions, + bench_prio_graph_fully_contend_transactions, + bench_greedy_non_contend_transactions, + bench_greedy_fully_contend_transactions, ); criterion_main!(benches); diff --git a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs index 767ca1465b0251..6f6eb8a95b95d7 100644 --- a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs @@ -24,7 +24,7 @@ use { solana_sdk::saturating_add_assign, }; -pub(crate) struct GreedySchedulerConfig { +pub struct GreedySchedulerConfig { pub target_scheduled_cus: u64, pub max_scanned_transactions_per_scheduling_pass: usize, pub target_transactions_per_batch: usize, @@ -54,7 +54,7 @@ pub struct GreedyScheduler { } impl GreedyScheduler { - pub(crate) fn new( + pub fn new( consume_work_senders: Vec>>, finished_consume_work_receiver: Receiver>, config: GreedySchedulerConfig, From c32fa36a4a33d5fe014571d33ae0c85a33a06792 Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Tue, 4 Mar 2025 11:40:07 -0600 Subject: [PATCH 5/6] fix rebase conflicts --- core/benches/prio_graph_scheduler.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/core/benches/prio_graph_scheduler.rs b/core/benches/prio_graph_scheduler.rs index 1219868f36d90b..e831eb028d7410 100644 --- a/core/benches/prio_graph_scheduler.rs +++ b/core/benches/prio_graph_scheduler.rs @@ -3,7 +3,6 @@ use { crossbeam_channel::{unbounded, Receiver, Sender}, jemallocator::Jemalloc, solana_core::banking_stage::{ - immutable_deserialized_packet::ImmutableDeserializedPacket, scheduler_messages::{ConsumeWork, FinishedConsumeWork, MaxAge}, transaction_scheduler::{ greedy_scheduler::{GreedyScheduler, GreedySchedulerConfig}, @@ -21,7 +20,6 @@ use { compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, - packet::Packet, pubkey::Pubkey, signature::Keypair, signer::Signer, @@ -153,12 +151,6 @@ impl BenchContainer { .unwrap() .compute_unit_price; - let packet = Arc::new( - ImmutableDeserializedPacket::new( - Packet::from_data(None, transaction.to_versioned_transaction()).unwrap(), - ) - .unwrap(), - ); let transaction_ttl = SanitizedTransactionTTL { transaction, max_age: MaxAge::MAX, @@ -168,7 +160,6 @@ impl BenchContainer { const TEST_TRANSACTION_COST: u64 = 0; if self.container.insert_new_transaction( transaction_ttl, - packet, compute_unit_price, TEST_TRANSACTION_COST, ) { From 22d323529b5a799147bd902e7a4af695a1f07c3f Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Tue, 4 Mar 2025 12:22:41 -0600 Subject: [PATCH 6/6] update transactions creation, print fmt --- core/benches/prio_graph_scheduler.rs | 76 ++++++++++++++++------------ 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/core/benches/prio_graph_scheduler.rs b/core/benches/prio_graph_scheduler.rs index e831eb028d7410..0d78c7fa58ef57 100644 --- a/core/benches/prio_graph_scheduler.rs +++ b/core/benches/prio_graph_scheduler.rs @@ -62,21 +62,29 @@ fn is_tracer(tx: &Tx) -> bool { // TODO - transaction factory, to build container scenarios // - contending / competing TX with non-contend low prio tx at bottom // - prio distribution doesn't matter since "insert" to container will sort them -fn build_non_contend_transactions(count: usize) -> Vec> { +fn build_transactions( + count: usize, + is_single_payer: bool, +) -> Vec> { let mut transactions = Vec::with_capacity(count); // non-contend low-prio tx is first received transactions.push(build_tracer_transaction()); let compute_unit_price = 1_000; - const MAX_TRANSFERS_PER_TX: usize = 58; + const MAX_TRANSFERS_PER_TX: usize = 16; //58; + let single_payer = Keypair::new(); for _n in 1..count { - let payer = Keypair::new(); - let to_pubkey = Pubkey::new_unique(); - let mut ixs = system_instruction::transfer_many( - &payer.pubkey(), - &vec![(to_pubkey, 1); MAX_TRANSFERS_PER_TX], + let payer = if is_single_payer { + // recreate keypair from single_payer + Keypair::from_bytes(&single_payer.to_bytes()).expect("Failed to create Keypair") + } else { + Keypair::new() + }; + let to = Vec::from_iter( + std::iter::repeat_with(|| (Pubkey::new_unique(), 1)).take(MAX_TRANSFERS_PER_TX), ); + let mut ixs = system_instruction::transfer_many(&payer.pubkey(), &to); let prioritization = ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price); ixs.push(prioritization); let message = Message::new(&ixs, Some(&payer.pubkey())); @@ -89,31 +97,12 @@ fn build_non_contend_transactions(count: usize) -> Vec Vec> { - let mut transactions = Vec::with_capacity(count); - // non-contend low-prio tx is first received - transactions.push(build_tracer_transaction()); - - let compute_unit_price = 1_000; - const MAX_TRANSFERS_PER_TX: usize = 58; - - let to_pubkey = Pubkey::new_unique(); - for _n in 1..count { - let payer = Keypair::new(); - let mut ixs = system_instruction::transfer_many( - &payer.pubkey().clone(), - &vec![(to_pubkey, 1); MAX_TRANSFERS_PER_TX], - ); - let prioritization = ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price); - ixs.push(prioritization); - let message = Message::new(&ixs, Some(&payer.pubkey())); - let tx = Transaction::new(&[payer], message, Hash::default()); - let transaction = RuntimeTransaction::from_transaction_for_tests(tx); - - transactions.push(transaction); - } +fn build_non_contend_transactions(count: usize) -> Vec> { + build_transactions(count, false) +} - transactions +fn build_fully_contend_transactions(count: usize) -> Vec> { + build_transactions(count, true) } // Tracer is a non-contend low-prio transfer transaction, it'd usually be inserted into the bottom @@ -183,7 +172,30 @@ struct BenchStats { impl BenchStats { fn print_and_reset(&mut self) { - println!("{:?}", self); + println!( + "== Averaged per bench stats to empty Container == + number of scheduling: {} + number of Works: {} + number of transactions scheduled: {} + number of transactions per work: {} + Tracer placement: {}", + self.num_of_scheduling + .checked_div(self.bench_iter_count) + .unwrap_or(0), + self.num_works + .load(Ordering::Relaxed) + .checked_div(self.bench_iter_count) + .unwrap_or(0), + self.num_transaction + .load(Ordering::Relaxed) + .checked_div(self.bench_iter_count) + .unwrap_or(0), + self.num_transaction + .load(Ordering::Relaxed) + .checked_div(self.num_works.load(Ordering::Relaxed)) + .unwrap_or(0), + self.tracer_placement.load(Ordering::Relaxed) + ); self.num_works.swap(0, Ordering::Relaxed); self.num_transaction.swap(0, Ordering::Relaxed); self.tracer_placement.swap(0, Ordering::Relaxed);