diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index f37b006396f561..69e9c244951fd5 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -97,7 +97,7 @@ type AtomicSchedulerId = AtomicU64; #[derive(Debug)] pub struct SchedulerPool, TH: TaskHandler> { scheduler_inners: Mutex>, - block_production_scheduler_inner: Mutex>, + block_production_scheduler_inner: Mutex>, trashed_scheduler_inners: Mutex>, timeout_listeners: Mutex>, handler_count: usize, @@ -126,14 +126,14 @@ pub struct SchedulerPool, TH: TaskHandler> { /// (= banks) and `banking_packet_receiver` shouldn't be consumed by multiple schedulers at once. /// So, it's managed differently from block-verification schedulers. #[derive(Default, Debug)] -enum BlockProdutionSchedulerInner, TH: TaskHandler> { +enum BlockProductionSchedulerInner, TH: TaskHandler> { #[default] NotSpawned, Pooled(S::Inner), Taken(SchedulerId), } -impl, TH: TaskHandler> BlockProdutionSchedulerInner { +impl, TH: TaskHandler> BlockProductionSchedulerInner { fn can_put(&self, returned: &S::Inner) -> bool { match self { Self::NotSpawned => false, @@ -634,7 +634,7 @@ where fn spawn_block_production_scheduler( &self, - block_production_scheduler_inner: &mut MutexGuard<'_, BlockProdutionSchedulerInner>, + block_production_scheduler_inner: &mut MutexGuard<'_, BlockProductionSchedulerInner>, ) { let scheduler = S::spawn( self.self_arc(), @@ -1586,16 +1586,16 @@ impl, TH: TaskHandler> ThreadManager { } }, recv(handler_context.banking_packet_receiver) -> banking_packet => { + let HandlerContext {banking_packet_handler, banking_stage_helper, ..} = &mut handler_context; + let banking_stage_helper = banking_stage_helper.as_ref().unwrap(); + // See solana_core::banking_stage::unified_scheduler module doc as to // justification of this additional work in the handler thread. let Ok(banking_packet) = banking_packet else { info!("disconnected banking_packet_receiver"); break; }; - (handler_context.banking_packet_handler)( - handler_context.banking_stage_helper.as_ref().unwrap(), - banking_packet - ); + banking_packet_handler(banking_stage_helper, banking_packet); continue; }, }; @@ -3773,7 +3773,7 @@ mod tests { let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); pool.register_banking_stage( banking_packet_receiver, - Box::new(|_, _| {}), + Box::new(|_, _| unreachable!()), poh_recorder.read().unwrap().new_recorder(), ); @@ -3878,11 +3878,11 @@ mod tests { ); pool.register_banking_stage( banking_packet_receiver, - Box::new(|_, _| {}), + Box::new(|_, _| unreachable!()), poh_recorder.read().unwrap().new_recorder(), ); - // Make sure the assertion in BlockProdutionSchedulerInner::can_put() doesn't cause false + // Make sure the assertion in BlockProductionSchedulerInner::can_put() doesn't cause false // positives... let context = SchedulingContext::for_verification(bank.clone()); let scheduler = pool.take_scheduler(context);