diff --git a/Cargo.lock b/Cargo.lock index a9b06c948b6294..9c8e04b1181781 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7559,6 +7559,7 @@ dependencies = [ "lazy_static", "log", "qualifier_attr", + "scopeguard", "solana-ledger", "solana-logger", "solana-program-runtime", diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 0fe3060fd84c80..9644d41a3bbd24 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6474,6 +6474,7 @@ dependencies = [ "derivative", "log", "qualifier_attr", + "scopeguard", "solana-ledger", "solana-program-runtime", "solana-runtime", diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index e605c1ef605bef..a5b12077cb1ae7 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -249,7 +249,7 @@ struct RentMetrics { pub type BankStatusCache = StatusCache>; #[cfg_attr( feature = "frozen-abi", - frozen_abi(digest = "9Pf3NTGr1AEzB4nKaVBY24uNwoQR4aJi8vc96W6kGvNk") + frozen_abi(digest = "CyD3t1HF7U63HhZawzETFFfo8XydftV8X7rnUTKtiN1G") )] pub type BankSlotDelta = SlotDelta>; diff --git a/sdk/src/transaction/error.rs b/sdk/src/transaction/error.rs index 41d2d94348180c..63d2862494ea2b 100644 --- a/sdk/src/transaction/error.rs +++ b/sdk/src/transaction/error.rs @@ -172,6 +172,10 @@ pub enum TransactionError { /// Program cache hit max limit. #[error("Program cache hit max limit")] ProgramCacheHitMaxLimit, + + /// Transaction processing is cancelled due to a panic or already aborted scheduler. + #[error("Transaction processing hasn't been completed due to an internal cancellation")] + ProcessingCancelled, } impl From for TransactionError { diff --git a/storage-proto/proto/transaction_by_addr.proto b/storage-proto/proto/transaction_by_addr.proto index d0fa74a2104707..43a5e22effd83a 100644 --- a/storage-proto/proto/transaction_by_addr.proto +++ b/storage-proto/proto/transaction_by_addr.proto @@ -63,6 +63,7 @@ enum TransactionErrorType { PROGRAM_EXECUTION_TEMPORARILY_RESTRICTED = 35; UNBALANCED_TRANSACTION = 36; PROGRAM_CACHE_HIT_MAX_LIMIT = 37; + PROCESSING_CANCELLED = 38; } message InstructionError { diff --git a/storage-proto/src/convert.rs b/storage-proto/src/convert.rs index 8d6669e44b43f1..3c124a20f6b376 100644 --- a/storage-proto/src/convert.rs +++ b/storage-proto/src/convert.rs @@ -816,6 +816,7 @@ impl TryFrom for TransactionError { 34 => TransactionError::ResanitizationNeeded, 36 => TransactionError::UnbalancedTransaction, 37 => TransactionError::ProgramCacheHitMaxLimit, + 38 => TransactionError::ProcessingCancelled, _ => return Err("Invalid TransactionError"), }) } @@ -937,6 +938,9 @@ impl From for tx_by_addr::TransactionError { TransactionError::ProgramCacheHitMaxLimit => { tx_by_addr::TransactionErrorType::ProgramCacheHitMaxLimit } + TransactionError::ProcessingCancelled => { + tx_by_addr::TransactionErrorType::ProcessingCancelled + } } as i32, instruction_error: match transaction_error { TransactionError::InstructionError(index, ref instruction_error) => { diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index cc6b3942e1f951..72ff300c1d0578 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -16,6 +16,7 @@ dashmap = { workspace = true } derivative = { workspace = true } log = { workspace = true } qualifier_attr = { workspace = true } +scopeguard = { workspace = true } solana-ledger = { workspace = true } solana-program-runtime = { workspace = true } solana-runtime = { workspace = true } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 15e56cfc488a90..5270243e82163e 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -21,6 +21,7 @@ use { dashmap::DashMap, derivative::Derivative, log::*, + scopeguard::defer, solana_ledger::blockstore_processor::{ execute_batch, TransactionBatchWithIndexes, TransactionStatusSender, }, @@ -663,6 +664,9 @@ impl PooledScheduler { } } +struct HandlerPanicked; +type HandlerResult = std::result::Result, HandlerPanicked>; + impl, TH: TaskHandler> ThreadManager { fn new(pool: Arc>) -> Self { let (new_task_sender, new_task_receiver) = crossbeam_channel::unbounded(); @@ -701,8 +705,12 @@ impl, TH: TaskHandler> ThreadManager { #[must_use] fn accumulate_result_with_timings( (result, timings): &mut ResultWithTimings, - executed_task: Box, + executed_task: HandlerResult, ) -> Option> { + let Ok(executed_task) = executed_task else { + *result = Err(TransactionError::ProcessingCancelled); + return None; + }; timings.accumulate(&executed_task.result_with_timings.1); match executed_task.result_with_timings.0 { Ok(()) => Some(executed_task), @@ -810,9 +818,9 @@ impl, TH: TaskHandler> ThreadManager { // which should be scheduled while minimizing the delay to clear buffered linearized runs // as fast as possible. let (finished_blocked_task_sender, finished_blocked_task_receiver) = - crossbeam_channel::unbounded::>(); + crossbeam_channel::unbounded::(); let (finished_idle_task_sender, finished_idle_task_receiver) = - crossbeam_channel::unbounded::>(); + crossbeam_channel::unbounded::(); assert_matches!(self.session_result_with_timings, None); @@ -1043,13 +1051,32 @@ impl, TH: TaskHandler> ThreadManager { } }, }; + defer! { + if !thread::panicking() { + return; + } + + // The scheduler thread can't detect panics in handler threads with + // disconnected channel errors, unless all of them has died. So, send an + // explicit Err promptly. + let current_thread = thread::current(); + error!("handler thread is panicking: {:?}", current_thread); + if sender.send(Err(HandlerPanicked)).is_ok() { + info!("notified a panic from {:?}", current_thread); + } else { + // It seems that scheduler has been aborted... + // This branch is deliberately tested by using 2 transactions with + // different timings in test_scheduler_schedule_execution_panic + warn!("failed to notify a panic from {:?}", current_thread); + } + } let mut task = ExecutedTask::new_boxed(task); Self::execute_task_with_handler( runnable_task_receiver.context().bank(), &mut task, &pool.handler_context, ); - if sender.send(task).is_err() { + if sender.send(Ok(task)).is_err() { warn!("handler_thread: scheduler thread aborted..."); break; } @@ -1085,12 +1112,29 @@ impl, TH: TaskHandler> ThreadManager { fn ensure_join_threads(&mut self, should_receive_session_result: bool) { trace!("ensure_join_threads() is called"); + fn join_with_panic_message(join_handle: JoinHandle<()>) -> thread::Result<()> { + let thread = join_handle.thread().clone(); + join_handle.join().inspect_err(|e| { + // Always needs to try both types for .downcast_ref(), according to + // https://doc.rust-lang.org/1.78.0/std/macro.panic.html: + // a panic can be accessed as a &dyn Any + Send, which contains either a &str or + // String for regular panic!() invocations. (Whether a particular invocation + // contains the payload at type &str or String is unspecified and can change.) + let panic_message = match (e.downcast_ref::<&str>(), e.downcast_ref::()) { + (Some(&s), _) => s, + (_, Some(s)) => s, + (None, None) => "", + }; + panic!("{} (From: {:?})", panic_message, thread); + }) + } + if let Some(scheduler_thread) = self.scheduler_thread.take() { for thread in self.handler_threads.drain(..) { debug!("joining...: {:?}", thread); - () = thread.join().unwrap(); + () = join_with_panic_message(thread).unwrap(); } - () = scheduler_thread.join().unwrap(); + () = join_with_panic_message(scheduler_thread).unwrap(); if should_receive_session_result { let result_with_timings = self.session_result_receiver.recv().unwrap(); @@ -1291,6 +1335,7 @@ mod tests { AfterSchedulerThreadAborted, AfterTrashedSchedulerCleaned, BeforeThreadManagerDrop, + BeforeEndSession, } #[test] @@ -1744,6 +1789,88 @@ mod tests { do_test_scheduler_schedule_execution_failure(false); } + #[test] + #[should_panic(expected = "This panic should be propagated. (From: ")] + fn test_scheduler_schedule_execution_panic() { + solana_logger::setup(); + + #[derive(Debug)] + enum PanickingHanlderCheckPoint { + BeforeNotifiedPanic, + BeforeIgnoredPanic, + } + + let progress = sleepless_testing::setup(&[ + &TestCheckPoint::BeforeNewTask, + &CheckPoint::NewTask(0), + &PanickingHanlderCheckPoint::BeforeNotifiedPanic, + &CheckPoint::SchedulerThreadAborted, + &PanickingHanlderCheckPoint::BeforeIgnoredPanic, + &TestCheckPoint::BeforeEndSession, + ]); + + #[derive(Debug)] + struct PanickingHandler; + impl TaskHandler for PanickingHandler { + fn handle( + _result: &mut Result<()>, + _timings: &mut ExecuteTimings, + _bank: &Arc, + _transaction: &SanitizedTransaction, + index: usize, + _handler_context: &HandlerContext, + ) { + if index == 0 { + sleepless_testing::at(PanickingHanlderCheckPoint::BeforeNotifiedPanic); + } else if index == 1 { + sleepless_testing::at(PanickingHanlderCheckPoint::BeforeIgnoredPanic); + } else { + unreachable!(); + } + panic!("This panic should be propagated."); + } + } + + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + + let bank = Bank::new_for_tests(&genesis_config); + let bank = setup_dummy_fork_graph(bank); + const TX_COUNT: usize = 2; + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = SchedulerPool::, _>::new_dyn( + Some(TX_COUNT), // fix to use exactly 2 handlers + None, + None, + None, + ignored_prioritization_fee_cache, + ); + let context = SchedulingContext::new(bank.clone()); + + let scheduler = pool.take_scheduler(context); + + for index in 0..TX_COUNT { + // Use 2 non-conflicting txes to exercise the channel disconnected case as well. + let tx = + &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + genesis_config.hash(), + )); + scheduler.schedule_execution(&(tx, index)).unwrap(); + } + // finally unblock the scheduler thread; otherwise the above schedule_execution could + // return SchedulerAborted... + sleepless_testing::at(TestCheckPoint::BeforeNewTask); + + sleepless_testing::at(TestCheckPoint::BeforeEndSession); + let bank = BankWithScheduler::new(bank, Some(scheduler)); + + // the outer .unwrap() will panic. so, drop progress now. + drop(progress); + bank.wait_for_completed_scheduler().unwrap().0.unwrap(); + } + #[test] fn test_scheduler_execution_failure_short_circuiting() { solana_logger::setup();