Skip to content

Commit

Permalink
Small clean-up for block production unified scheduler code (#5146)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun authored Mar 6, 2025
1 parent 877c18f commit 9ae8e4c
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type AtomicSchedulerId = AtomicU64;
#[derive(Debug)]
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_inners: Mutex<Vec<(S::Inner, Instant)>>,
block_production_scheduler_inner: Mutex<BlockProdutionSchedulerInner<S, TH>>,
block_production_scheduler_inner: Mutex<BlockProductionSchedulerInner<S, TH>>,
trashed_scheduler_inners: Mutex<Vec<S::Inner>>,
timeout_listeners: Mutex<Vec<(TimeoutListener, Instant)>>,
handler_count: usize,
Expand Down Expand Up @@ -126,14 +126,14 @@ pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
/// (= banks) and `banking_packet_receiver` shouldn't be consumed by multiple schedulers at once.
/// So, it's managed differently from block-verification schedulers.
#[derive(Default, Debug)]
enum BlockProdutionSchedulerInner<S: SpawnableScheduler<TH>, TH: TaskHandler> {
enum BlockProductionSchedulerInner<S: SpawnableScheduler<TH>, TH: TaskHandler> {
#[default]
NotSpawned,
Pooled(S::Inner),
Taken(SchedulerId),
}

impl<S: SpawnableScheduler<TH>, TH: TaskHandler> BlockProdutionSchedulerInner<S, TH> {
impl<S: SpawnableScheduler<TH>, TH: TaskHandler> BlockProductionSchedulerInner<S, TH> {
fn can_put(&self, returned: &S::Inner) -> bool {
match self {
Self::NotSpawned => false,
Expand Down Expand Up @@ -634,7 +634,7 @@ where

fn spawn_block_production_scheduler(
&self,
block_production_scheduler_inner: &mut MutexGuard<'_, BlockProdutionSchedulerInner<S, TH>>,
block_production_scheduler_inner: &mut MutexGuard<'_, BlockProductionSchedulerInner<S, TH>>,
) {
let scheduler = S::spawn(
self.self_arc(),
Expand Down Expand Up @@ -1586,16 +1586,16 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}
},
recv(handler_context.banking_packet_receiver) -> banking_packet => {
let HandlerContext {banking_packet_handler, banking_stage_helper, ..} = &mut handler_context;
let banking_stage_helper = banking_stage_helper.as_ref().unwrap();

// See solana_core::banking_stage::unified_scheduler module doc as to
// justification of this additional work in the handler thread.
let Ok(banking_packet) = banking_packet else {
info!("disconnected banking_packet_receiver");
break;
};
(handler_context.banking_packet_handler)(
handler_context.banking_stage_helper.as_ref().unwrap(),
banking_packet
);
banking_packet_handler(banking_stage_helper, banking_packet);
continue;
},
};
Expand Down Expand Up @@ -3773,7 +3773,7 @@ mod tests {
let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded();
pool.register_banking_stage(
banking_packet_receiver,
Box::new(|_, _| {}),
Box::new(|_, _| unreachable!()),
poh_recorder.read().unwrap().new_recorder(),
);

Expand Down Expand Up @@ -3878,11 +3878,11 @@ mod tests {
);
pool.register_banking_stage(
banking_packet_receiver,
Box::new(|_, _| {}),
Box::new(|_, _| unreachable!()),
poh_recorder.read().unwrap().new_recorder(),
);

// Make sure the assertion in BlockProdutionSchedulerInner::can_put() doesn't cause false
// Make sure the assertion in BlockProductionSchedulerInner::can_put() doesn't cause false
// positives...
let context = SchedulingContext::for_verification(bank.clone());
let scheduler = pool.take_scheduler(context);
Expand Down

0 comments on commit 9ae8e4c

Please sign in to comment.