diff --git a/Cargo.lock b/Cargo.lock index 31e61fef7b5acb..9e8854d0eab2d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7214,6 +7214,7 @@ dependencies = [ "bs58", "bytes", "chrono", + "criterion", "crossbeam-channel", "dashmap", "etcd-client", @@ -7303,6 +7304,7 @@ dependencies = [ "tempfile", "test-case", "thiserror 2.0.11", + "tikv-jemallocator", "tokio", "trees", ] diff --git a/compute-budget-instruction/src/compute_budget_instruction_details.rs b/compute-budget-instruction/src/compute_budget_instruction_details.rs index 145172033d0dfb..8e2dbb1bc96820 100644 --- a/compute-budget-instruction/src/compute_budget_instruction_details.rs +++ b/compute-budget-instruction/src/compute_budget_instruction_details.rs @@ -50,6 +50,14 @@ pub struct ComputeBudgetInstructionDetails { migrating_builtin_feature_counters: MigrationBuiltinFeatureCounter, } +// add pub access to raw Cb fields +impl ComputeBudgetInstructionDetails { + pub fn requested_loaded_accounts_data_size_limit(&self) -> Option { + self.requested_loaded_accounts_data_size_limit + .map(|(_, v)| v) + } +} + impl ComputeBudgetInstructionDetails { pub fn try_from<'a>( instructions: impl Iterator)> + Clone, diff --git a/core/Cargo.toml b/core/Cargo.toml index b5337373e4f17a..c0e3ef2076ebc3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -108,7 +108,9 @@ tokio = { workspace = true, features = ["full"] } trees = { workspace = true } [dev-dependencies] +criterion = { workspace = true } fs_extra = { workspace = true } +jemallocator = { workspace = true } serde_json = { workspace = true } serial_test = { workspace = true } # See order-crates-for-publishing.py for using this unusual `path = "."` @@ -171,6 +173,10 @@ name = "gen_keys" [[bench]] name = "sigverify_stage" +[[bench]] +name = "prio_graph_scheduler" +harness = false + [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/core/benches/prio_graph_scheduler.rs b/core/benches/prio_graph_scheduler.rs new file mode 100644 index 00000000000000..0d78c7fa58ef57 --- /dev/null +++ b/core/benches/prio_graph_scheduler.rs @@ -0,0 +1,498 @@ +use { + criterion::{black_box, criterion_group, criterion_main, Criterion}, + crossbeam_channel::{unbounded, Receiver, Sender}, + jemallocator::Jemalloc, + solana_core::banking_stage::{ + scheduler_messages::{ConsumeWork, FinishedConsumeWork, MaxAge}, + transaction_scheduler::{ + greedy_scheduler::{GreedyScheduler, GreedySchedulerConfig}, + prio_graph_scheduler::{PrioGraphScheduler, PrioGraphSchedulerConfig}, + scheduler::Scheduler, + transaction_state::SanitizedTransactionTTL, + transaction_state_container::{StateContainer, TransactionStateContainer}, + }, + TOTAL_BUFFERED_PACKETS, + }, + solana_runtime_transaction::{ + runtime_transaction::RuntimeTransaction, transaction_with_meta::TransactionWithMeta, + }, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + hash::Hash, + message::Message, + pubkey::Pubkey, + signature::Keypair, + signer::Signer, + system_instruction, + transaction::{SanitizedTransaction, Transaction}, + }, + std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; + +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +// A non-contend low-prio tx, aka Tracer, is tag with this requested_loaded_accounts_data_size_limit +const TAG_NUMBER: u32 = 1234; + +fn is_tracer(tx: &Tx) -> bool { + matches!( + tx.compute_budget_instruction_details() + .requested_loaded_accounts_data_size_limit(), + Some(TAG_NUMBER) + ) +} + +// TODO - the goal is to measure and compare performance between different Schedulers, +// and observe the behavior/pattern of Scheduler; +// - performance: time, througput +// - behavior: +// - how many time it loops container; +// - when non-contend low-pri is picked; +// - behavior/perform if block limits reached +// +// In order to do that, we need: +// - dummy worker that quickly drain channel to minimize pressure that can potentially impact +// Scheduler `send` works +// - identically prefilled container for each benck loops. + +// TODO - transaction factory, to build container scenarios +// - contending / competing TX with non-contend low prio tx at bottom +// - prio distribution doesn't matter since "insert" to container will sort them +fn build_transactions( + count: usize, + is_single_payer: bool, +) -> Vec> { + let mut transactions = Vec::with_capacity(count); + // non-contend low-prio tx is first received + transactions.push(build_tracer_transaction()); + + let compute_unit_price = 1_000; + const MAX_TRANSFERS_PER_TX: usize = 16; //58; + + let single_payer = Keypair::new(); + for _n in 1..count { + let payer = if is_single_payer { + // recreate keypair from single_payer + Keypair::from_bytes(&single_payer.to_bytes()).expect("Failed to create Keypair") + } else { + Keypair::new() + }; + let to = Vec::from_iter( + std::iter::repeat_with(|| (Pubkey::new_unique(), 1)).take(MAX_TRANSFERS_PER_TX), + ); + let mut ixs = system_instruction::transfer_many(&payer.pubkey(), &to); + let prioritization = ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price); + ixs.push(prioritization); + let message = Message::new(&ixs, Some(&payer.pubkey())); + let tx = Transaction::new(&[payer], message, Hash::default()); + let transaction = RuntimeTransaction::from_transaction_for_tests(tx); + + transactions.push(transaction); + } + + transactions +} + +fn build_non_contend_transactions(count: usize) -> Vec> { + build_transactions(count, false) +} + +fn build_fully_contend_transactions(count: usize) -> Vec> { + build_transactions(count, true) +} + +// Tracer is a non-contend low-prio transfer transaction, it'd usually be inserted into the bottom +// of Container ddue to its low prio, but it should be scheduled early since it is non-contend for +// better UX. +fn build_tracer_transaction() -> RuntimeTransaction { + let payer = Keypair::new(); + let to_pubkey = Pubkey::new_unique(); + let mut ixs = vec![system_instruction::transfer(&payer.pubkey(), &to_pubkey, 1)]; + ixs.push(ComputeBudgetInstruction::set_compute_unit_price(4)); + ixs.push(ComputeBudgetInstruction::set_loaded_accounts_data_size_limit(TAG_NUMBER)); + let message = Message::new(&ixs, Some(&payer.pubkey())); + let tx = Transaction::new(&[payer], message, Hash::default()); + RuntimeTransaction::from_transaction_for_tests(tx) +} + +struct BenchContainer { + container: TransactionStateContainer, +} + +impl BenchContainer { + fn new(capacity: usize) -> Self { + Self { + container: TransactionStateContainer::with_capacity(capacity), + } + } + + fn fill_container(&mut self, transactions: impl Iterator) { + for transaction in transactions { + let compute_unit_price = transaction + .compute_budget_instruction_details() + .sanitize_and_convert_to_compute_budget_limits( + &solana_feature_set::FeatureSet::default(), + ) + .unwrap() + .compute_unit_price; + + let transaction_ttl = SanitizedTransactionTTL { + transaction, + max_age: MaxAge::MAX, + }; + // NOTE - setting transaction cost to be `0` for now, so it doesn't bother block_limits + // when scheduling. + const TEST_TRANSACTION_COST: u64 = 0; + if self.container.insert_new_transaction( + transaction_ttl, + compute_unit_price, + TEST_TRANSACTION_COST, + ) { + unreachable!("test is setup to fill the Container to fullness"); + } + } + } +} + +#[derive(Debug, Default)] +struct BenchStats { + bench_iter_count: usize, + num_of_scheduling: usize, + // worker reports: + num_works: Arc, + num_transaction: Arc, // = bench_iter_count * container_capacity + tracer_placement: Arc, // > 0 + // from scheduler().result: + num_scheduled: usize, // = num_transaction +} + +impl BenchStats { + fn print_and_reset(&mut self) { + println!( + "== Averaged per bench stats to empty Container == + number of scheduling: {} + number of Works: {} + number of transactions scheduled: {} + number of transactions per work: {} + Tracer placement: {}", + self.num_of_scheduling + .checked_div(self.bench_iter_count) + .unwrap_or(0), + self.num_works + .load(Ordering::Relaxed) + .checked_div(self.bench_iter_count) + .unwrap_or(0), + self.num_transaction + .load(Ordering::Relaxed) + .checked_div(self.bench_iter_count) + .unwrap_or(0), + self.num_transaction + .load(Ordering::Relaxed) + .checked_div(self.num_works.load(Ordering::Relaxed)) + .unwrap_or(0), + self.tracer_placement.load(Ordering::Relaxed) + ); + self.num_works.swap(0, Ordering::Relaxed); + self.num_transaction.swap(0, Ordering::Relaxed); + self.tracer_placement.swap(0, Ordering::Relaxed); + self.bench_iter_count = 0; + self.num_of_scheduling = 0; + self.num_scheduled = 0; + } +} + +// a bench consumer worker that quickly drain work channel, then send a OK back via completed-work +// channel +// NOTE: Avoid creating PingPong within bench iter since joining threads at its eol would +// introducing variance to bench timing. +#[allow(dead_code)] +struct PingPong { + threads: Vec>, +} + +impl PingPong { + fn new( + work_receivers: Vec>>, + completed_work_sender: Sender>, + num_works: Arc, + num_transaction: Arc, + tracer_placement: Arc, + ) -> Self { + let mut threads = Vec::with_capacity(work_receivers.len()); + + for receiver in work_receivers { + let completed_work_sender_clone = completed_work_sender.clone(); + let num_works_clone = num_works.clone(); + let num_transaction_clone = num_transaction.clone(); + let tracer_placement_clone = tracer_placement.clone(); + + let handle = std::thread::spawn(move || { + Self::service_loop( + receiver, + completed_work_sender_clone, + num_works_clone, + num_transaction_clone, + tracer_placement_clone, + ); + }); + threads.push(handle); + } + + Self { threads } + } + + fn service_loop( + work_receiver: Receiver>, + completed_work_sender: Sender>, + num_works: Arc, + num_transaction: Arc, + tracer_placement: Arc, + ) { + // NOTE: will blocking recv() impact benchmark quality? Perhaps making worker threads + // hot spinning? + while let Ok(work) = work_receiver.recv() { + num_works.fetch_add(1, Ordering::Relaxed); + let mut tx_count = + num_transaction.fetch_add(work.transactions.len(), Ordering::Relaxed); + if tracer_placement.load(Ordering::Relaxed) == 0 { + work.transactions.iter().for_each(|tx| { + tx_count += 1; + if is_tracer(tx) { + tracer_placement.store(tx_count, Ordering::Relaxed) + } + }); + } + + if completed_work_sender + .send(FinishedConsumeWork { + work, + retryable_indexes: vec![], + }) + .is_err() + { + // kill this worker if finished_work channel is broken + break; + } + } + } +} + +struct BenchEnv { + pingpong_worker: PingPong, + filter_1: fn(&[&Tx], &mut [bool]), + filter_2: fn(&Tx) -> bool, + consume_work_senders: Vec>>, + finished_consume_work_receiver: Receiver>, +} + +impl BenchEnv { + fn new(stats: &mut BenchStats) -> Self { + let num_workers = 4; + + let (consume_work_senders, consume_work_receivers) = + (0..num_workers).map(|_| unbounded()).unzip(); + let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); + let pingpong_worker = PingPong::new( + consume_work_receivers, + finished_consume_work_sender, + stats.num_works.clone(), + stats.num_transaction.clone(), + stats.tracer_placement.clone(), + ); + + Self { + pingpong_worker, + filter_1: Self::test_pre_graph_filter, + filter_2: Self::test_pre_lock_filter, + consume_work_senders, + finished_consume_work_receiver, + } + } + + fn test_pre_graph_filter(_txs: &[&Tx], results: &mut [bool]) { + results.fill(true); + } + + fn test_pre_lock_filter(_tx: &Tx) -> bool { + true + } + + fn run( + &self, + mut scheduler: impl Scheduler, + mut container: TransactionStateContainer, + stats: &mut BenchStats, + ) { + // each bench measurement is to schedule everything in the container + while !container.is_empty() { + let result = scheduler + .schedule(&mut container, self.filter_1, self.filter_2) + .unwrap(); + + // do some VERY QUICK stats collecting to print/assert at end of bench + stats.num_of_scheduling += 1; + stats.num_scheduled += result.num_scheduled; + } + + stats.bench_iter_count += 1; + } +} + +fn bench_empty_container(c: &mut Criterion) { + let mut stats = BenchStats::default(); + let bench_env: BenchEnv> = BenchEnv::new(&mut stats); + + c.benchmark_group("bench_empty_container") + .bench_function("sdk_transaction_type", |bencher| { + bencher.iter_with_setup( + || { + let bench_container = BenchContainer::new(0); + let scheduler = PrioGraphScheduler::new( + bench_env.consume_work_senders.clone(), + bench_env.finished_consume_work_receiver.clone(), + PrioGraphSchedulerConfig::default(), + ); + (scheduler, bench_container.container) + }, + |(scheduler, container)| { + black_box(bench_env.run(scheduler, container, &mut stats)); + //stats.print_and_reset(); + }, + ) + }); + stats.print_and_reset(); +} + +fn bench_prio_graph_non_contend_transactions(c: &mut Criterion) { + let capacity = TOTAL_BUFFERED_PACKETS; + let mut stats = BenchStats::default(); + let bench_env: BenchEnv> = BenchEnv::new(&mut stats); + + c.benchmark_group("bench_prio_graph_non_contend_transactions") + .sample_size(10) + .bench_function("sdk_transaction_type", |bencher| { + bencher.iter_with_setup( + || { + let mut bench_container = BenchContainer::new(capacity); + bench_container + .fill_container(build_non_contend_transactions(capacity).into_iter()); + let scheduler = PrioGraphScheduler::new( + bench_env.consume_work_senders.clone(), + bench_env.finished_consume_work_receiver.clone(), + PrioGraphSchedulerConfig::default(), + ); + (scheduler, bench_container.container) + }, + |(scheduler, container)| { + black_box(bench_env.run(scheduler, container, &mut stats)); + //stats.print_and_reset(); + }, + ) + }); + + stats.print_and_reset(); +} + +fn bench_prio_graph_fully_contend_transactions(c: &mut Criterion) { + let capacity = TOTAL_BUFFERED_PACKETS; + let mut stats = BenchStats::default(); + let bench_env: BenchEnv> = BenchEnv::new(&mut stats); + + c.benchmark_group("bench_prio_graph_fully_contend_transactions") + .sample_size(10) + .bench_function("sdk_transaction_type", |bencher| { + bencher.iter_with_setup( + || { + let mut bench_container = BenchContainer::new(capacity); + bench_container + .fill_container(build_fully_contend_transactions(capacity).into_iter()); + let scheduler = PrioGraphScheduler::new( + bench_env.consume_work_senders.clone(), + bench_env.finished_consume_work_receiver.clone(), + PrioGraphSchedulerConfig::default(), + ); + (scheduler, bench_container.container) + }, + |(scheduler, container)| { + black_box(bench_env.run(scheduler, container, &mut stats)); + //stats.print_and_reset(); + }, + ) + }); + + stats.print_and_reset(); +} + +fn bench_greedy_non_contend_transactions(c: &mut Criterion) { + let capacity = TOTAL_BUFFERED_PACKETS; + let mut stats = BenchStats::default(); + let bench_env: BenchEnv> = BenchEnv::new(&mut stats); + + c.benchmark_group("bench_greedy_non_contend_transactions") + .sample_size(10) + .bench_function("sdk_transaction_type", |bencher| { + bencher.iter_with_setup( + || { + let mut bench_container = BenchContainer::new(capacity); + bench_container + .fill_container(build_non_contend_transactions(capacity).into_iter()); + let scheduler = GreedyScheduler::new( + bench_env.consume_work_senders.clone(), + bench_env.finished_consume_work_receiver.clone(), + GreedySchedulerConfig::default(), + ); + (scheduler, bench_container.container) + }, + |(scheduler, container)| { + black_box(bench_env.run(scheduler, container, &mut stats)); + //stats.print_and_reset(); + }, + ) + }); + + stats.print_and_reset(); +} + +fn bench_greedy_fully_contend_transactions(c: &mut Criterion) { + let capacity = TOTAL_BUFFERED_PACKETS; + let mut stats = BenchStats::default(); + let bench_env: BenchEnv> = BenchEnv::new(&mut stats); + + c.benchmark_group("bench_greedy_fully_contend_transactions") + .sample_size(10) + .bench_function("sdk_transaction_type", |bencher| { + bencher.iter_with_setup( + || { + let mut bench_container = BenchContainer::new(capacity); + bench_container + .fill_container(build_fully_contend_transactions(capacity).into_iter()); + let scheduler = GreedyScheduler::new( + bench_env.consume_work_senders.clone(), + bench_env.finished_consume_work_receiver.clone(), + GreedySchedulerConfig::default(), + ); + (scheduler, bench_container.container) + }, + |(scheduler, container)| { + black_box(bench_env.run(scheduler, container, &mut stats)); + //stats.print_and_reset(); + }, + ) + }); + + stats.print_and_reset(); +} + +criterion_group!( + benches, + bench_empty_container, + bench_prio_graph_non_contend_transactions, + bench_prio_graph_fully_contend_transactions, + bench_greedy_non_contend_transactions, + bench_greedy_fully_contend_transactions, +); +criterion_main!(benches); diff --git a/core/benches/scheduler_controller.rs b/core/benches/scheduler_controller.rs new file mode 100644 index 00000000000000..f0a84509096d81 --- /dev/null +++ b/core/benches/scheduler_controller.rs @@ -0,0 +1,14 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; + +fn bench_dummy(c: &mut Criterion) { + c.benchmark_group("bench_dummy") + .throughput(Throughput::Elements(1)) + .bench_function("dummy", |bencher| { + bencher.iter(|| { + std::thread::sleep(std::time::Duration::from_millis(100)); + }); + }); +} + +criterion_group!(benches, bench_dummy); +criterion_main!(benches); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 14176c17cb65a5..3c37d0a89221ac 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -66,10 +66,13 @@ pub mod leader_slot_metrics; pub mod qos_service; pub mod unprocessed_packet_batches; pub mod unprocessed_transaction_storage; +// made public for benching scheduler +pub mod scheduler_messages; +pub mod transaction_scheduler; mod consume_worker; mod decision_maker; -mod immutable_deserialized_packet; +pub mod immutable_deserialized_packet; mod latest_unprocessed_votes; mod leader_slot_timing_metrics; mod multi_iterator_scanner; @@ -77,8 +80,6 @@ mod packet_deserializer; mod packet_filter; mod packet_receiver; mod read_write_account_set; -mod scheduler_messages; -mod transaction_scheduler; // proc_macro_hygiene needs to be stabilzied to use qualifier_attr... // error[E0658]: non-inline modules in proc macro input are unstable @@ -90,7 +91,7 @@ pub mod unified_scheduler; // Fixed thread size seems to be fastest on GCP setup pub const NUM_THREADS: u32 = 6; -const TOTAL_BUFFERED_PACKETS: usize = 100_000; +pub const TOTAL_BUFFERED_PACKETS: usize = 100_000; const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; diff --git a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs index 767ca1465b0251..6f6eb8a95b95d7 100644 --- a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs @@ -24,7 +24,7 @@ use { solana_sdk::saturating_add_assign, }; -pub(crate) struct GreedySchedulerConfig { +pub struct GreedySchedulerConfig { pub target_scheduled_cus: u64, pub max_scanned_transactions_per_scheduling_pass: usize, pub target_transactions_per_batch: usize, @@ -54,7 +54,7 @@ pub struct GreedyScheduler { } impl GreedyScheduler { - pub(crate) fn new( + pub fn new( consume_work_senders: Vec>>, finished_consume_work_receiver: Receiver>, config: GreedySchedulerConfig, diff --git a/core/src/banking_stage/transaction_scheduler/mod.rs b/core/src/banking_stage/transaction_scheduler/mod.rs index c28e3110cf7033..1fa7d176ad403d 100644 --- a/core/src/banking_stage/transaction_scheduler/mod.rs +++ b/core/src/banking_stage/transaction_scheduler/mod.rs @@ -1,13 +1,13 @@ mod batch_id_generator; -pub(crate) mod greedy_scheduler; +pub mod greedy_scheduler; mod in_flight_tracker; -pub(crate) mod prio_graph_scheduler; -pub(crate) mod receive_and_buffer; -pub(crate) mod scheduler; +pub mod prio_graph_scheduler; +pub mod receive_and_buffer; +pub mod scheduler; pub(crate) mod scheduler_controller; pub(crate) mod scheduler_error; mod scheduler_metrics; mod thread_aware_account_locks; mod transaction_priority_id; -mod transaction_state; -pub(crate) mod transaction_state_container; +pub mod transaction_state; +pub mod transaction_state_container; diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 8e2090b4289c62..26bfb07fac1d80 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -42,7 +42,7 @@ type SchedulerPrioGraph = PrioGraph< fn(&TransactionPriorityId, &GraphNode) -> TransactionPriorityId, >; -pub(crate) struct PrioGraphSchedulerConfig { +pub struct PrioGraphSchedulerConfig { pub max_scheduled_cus: u64, pub max_scanned_transactions_per_scheduling_pass: usize, pub look_ahead_window_size: usize, @@ -60,7 +60,7 @@ impl Default for PrioGraphSchedulerConfig { } } -pub(crate) struct PrioGraphScheduler { +pub struct PrioGraphScheduler { in_flight_tracker: InFlightTracker, account_locks: ThreadAwareAccountLocks, consume_work_senders: Vec>>, @@ -70,7 +70,7 @@ pub(crate) struct PrioGraphScheduler { } impl PrioGraphScheduler { - pub(crate) fn new( + pub fn new( consume_work_senders: Vec>>, finished_consume_work_receiver: Receiver>, config: PrioGraphSchedulerConfig, diff --git a/core/src/banking_stage/transaction_scheduler/scheduler.rs b/core/src/banking_stage/transaction_scheduler/scheduler.rs index 0aacb68630df2a..715393406083fb 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler.rs @@ -3,7 +3,7 @@ use { solana_runtime_transaction::transaction_with_meta::TransactionWithMeta, }; -pub(crate) trait Scheduler { +pub trait Scheduler { /// Schedule transactions from `container`. /// pre-graph and pre-lock filters may be passed to be applied /// before specific actions internally. @@ -24,7 +24,7 @@ pub(crate) trait Scheduler { /// Metrics from scheduling transactions. #[derive(Default, Debug, PartialEq, Eq)] -pub(crate) struct SchedulingSummary { +pub struct SchedulingSummary { /// Number of transactions scheduled. pub num_scheduled: usize, /// Number of transactions that were not scheduled due to conflicts. diff --git a/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs b/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs index 7142f3ac09736d..2da8e6b7dbd47f 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs @@ -6,7 +6,7 @@ use { /// A unique identifier tied with priority ordering for a transaction/packet: #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub(crate) struct TransactionPriorityId { +pub struct TransactionPriorityId { pub(crate) priority: u64, pub(crate) id: TransactionId, } diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state.rs b/core/src/banking_stage/transaction_scheduler/transaction_state.rs index 5311e22c45c3f3..5176c63aaa513a 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state.rs @@ -1,9 +1,9 @@ use crate::banking_stage::scheduler_messages::MaxAge; /// Simple wrapper type to tie a sanitized transaction to max age slot. -pub(crate) struct SanitizedTransactionTTL { - pub(crate) transaction: Tx, - pub(crate) max_age: MaxAge, +pub struct SanitizedTransactionTTL { + pub transaction: Tx, + pub max_age: MaxAge, } /// TransactionState is used to track the state of a transaction in the transaction scheduler @@ -26,7 +26,7 @@ pub(crate) struct SanitizedTransactionTTL { /// to the appropriate thread for processing. This is done to avoid cloning the /// `SanitizedTransaction`. #[allow(clippy::large_enum_variant)] -pub(crate) enum TransactionState { +pub enum TransactionState { /// The transaction is available for scheduling. Unprocessed { transaction_ttl: SanitizedTransactionTTL, diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index a11ffb4da97757..1a8408adbb9891 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -40,13 +40,13 @@ use { /// /// The container maintains a fixed capacity. If the queue is full when pushing /// a new transaction, the lowest priority transaction will be dropped. -pub(crate) struct TransactionStateContainer { +pub struct TransactionStateContainer { capacity: usize, priority_queue: MinMaxHeap, id_to_transaction_state: Slab>, } -pub(crate) trait StateContainer { +pub trait StateContainer { /// Create a new `TransactionStateContainer` with the given capacity. fn with_capacity(capacity: usize) -> Self; @@ -173,7 +173,7 @@ impl StateContainer for TransactionStateContainer TransactionStateContainer { /// Insert a new transaction into the container's queues and maps. /// Returns `true` if a packet was dropped due to capacity limits. - pub(crate) fn insert_new_transaction( + pub fn insert_new_transaction( &mut self, transaction_ttl: SanitizedTransactionTTL, priority: u64,