Skip to content

Commit

Permalink
Gracefully abort scheduler on a panic in handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Jun 2, 2024
1 parent 74dcd3b commit 62520f2
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ struct RentMetrics {
pub type BankStatusCache = StatusCache<Result<()>>;
#[cfg_attr(
feature = "frozen-abi",
frozen_abi(digest = "9Pf3NTGr1AEzB4nKaVBY24uNwoQR4aJi8vc96W6kGvNk")
frozen_abi(digest = "CyD3t1HF7U63HhZawzETFFfo8XydftV8X7rnUTKtiN1G")
)]
pub type BankSlotDelta = SlotDelta<Result<()>>;

Expand Down
4 changes: 4 additions & 0 deletions sdk/src/transaction/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ pub enum TransactionError {
/// Program cache hit max limit.
#[error("Program cache hit max limit")]
ProgramCacheHitMaxLimit,

/// Transaction processing is cancelled due to a panic or already aborted scheduler.
#[error("Transaction processing hasn't been completed due to an internal cancellation")]
ProcessingCancelled,
}

impl From<SanitizeError> for TransactionError {
Expand Down
1 change: 1 addition & 0 deletions storage-proto/proto/transaction_by_addr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ enum TransactionErrorType {
PROGRAM_EXECUTION_TEMPORARILY_RESTRICTED = 35;
UNBALANCED_TRANSACTION = 36;
PROGRAM_CACHE_HIT_MAX_LIMIT = 37;
PROCESSING_CANCELLED = 38;
}

message InstructionError {
Expand Down
4 changes: 4 additions & 0 deletions storage-proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ impl TryFrom<tx_by_addr::TransactionError> for TransactionError {
34 => TransactionError::ResanitizationNeeded,
36 => TransactionError::UnbalancedTransaction,
37 => TransactionError::ProgramCacheHitMaxLimit,
38 => TransactionError::ProcessingCancelled,
_ => return Err("Invalid TransactionError"),
})
}
Expand Down Expand Up @@ -937,6 +938,9 @@ impl From<TransactionError> for tx_by_addr::TransactionError {
TransactionError::ProgramCacheHitMaxLimit => {
tx_by_addr::TransactionErrorType::ProgramCacheHitMaxLimit
}
TransactionError::ProcessingCancelled => {
tx_by_addr::TransactionErrorType::ProcessingCancelled
}
} as i32,
instruction_error: match transaction_error {
TransactionError::InstructionError(index, ref instruction_error) => {
Expand Down
1 change: 1 addition & 0 deletions unified-scheduler-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dashmap = { workspace = true }
derivative = { workspace = true }
log = { workspace = true }
qualifier_attr = { workspace = true }
scopeguard = { workspace = true }
solana-ledger = { workspace = true }
solana-program-runtime = { workspace = true }
solana-runtime = { workspace = true }
Expand Down
139 changes: 133 additions & 6 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use {
dashmap::DashMap,
derivative::Derivative,
log::*,
scopeguard::defer,
solana_ledger::blockstore_processor::{
execute_batch, TransactionBatchWithIndexes, TransactionStatusSender,
},
Expand Down Expand Up @@ -663,6 +664,9 @@ impl<TH: TaskHandler> PooledScheduler<TH> {
}
}

struct HandlerPanicked;
type HandlerResult = std::result::Result<Box<ExecutedTask>, HandlerPanicked>;

impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn new(pool: Arc<SchedulerPool<S, TH>>) -> Self {
let (new_task_sender, new_task_receiver) = crossbeam_channel::unbounded();
Expand Down Expand Up @@ -701,8 +705,12 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
#[must_use]
fn accumulate_result_with_timings(
(result, timings): &mut ResultWithTimings,
executed_task: Box<ExecutedTask>,
executed_task: HandlerResult,
) -> Option<Box<ExecutedTask>> {
let Ok(executed_task) = executed_task else {
*result = Err(TransactionError::ProcessingCancelled);
return None;
};
timings.accumulate(&executed_task.result_with_timings.1);
match executed_task.result_with_timings.0 {
Ok(()) => Some(executed_task),
Expand Down Expand Up @@ -810,9 +818,9 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// which should be scheduled while minimizing the delay to clear buffered linearized runs
// as fast as possible.
let (finished_blocked_task_sender, finished_blocked_task_receiver) =
crossbeam_channel::unbounded::<Box<ExecutedTask>>();
crossbeam_channel::unbounded::<HandlerResult>();
let (finished_idle_task_sender, finished_idle_task_receiver) =
crossbeam_channel::unbounded::<Box<ExecutedTask>>();
crossbeam_channel::unbounded::<HandlerResult>();

assert_matches!(self.session_result_with_timings, None);

Expand Down Expand Up @@ -1043,13 +1051,32 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}
},
};
defer! {
if !thread::panicking() {
return;
}

// The scheduler thread can't detect panics in handler threads with
// disconnected channel errors, unless all of them has died. So, send an
// explicit Err promptly.
let current_thread = thread::current();
error!("handler thread is panicking: {:?}", current_thread);
if sender.send(Err(HandlerPanicked)).is_ok() {
info!("notified a panic from {:?}", current_thread);
} else {
// It seems that scheduler has been aborted...
// This branch is deliberately tested by using 2 transactions with
// different timings in test_scheduler_schedule_execution_panic
warn!("failed to notify a panic from {:?}", current_thread);
}
}
let mut task = ExecutedTask::new_boxed(task);
Self::execute_task_with_handler(
runnable_task_receiver.context().bank(),
&mut task,
&pool.handler_context,
);
if sender.send(task).is_err() {
if sender.send(Ok(task)).is_err() {
warn!("handler_thread: scheduler thread aborted...");
break;
}
Expand Down Expand Up @@ -1085,12 +1112,29 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn ensure_join_threads(&mut self, should_receive_session_result: bool) {
trace!("ensure_join_threads() is called");

fn join_with_panic_message(join_handle: JoinHandle<()>) -> thread::Result<()> {
let thread = join_handle.thread().clone();
join_handle.join().inspect_err(|e| {
// Always needs to try both types for .downcast_ref(), according to
// https://doc.rust-lang.org/1.78.0/std/macro.panic.html:
// a panic can be accessed as a &dyn Any + Send, which contains either a &str or
// String for regular panic!() invocations. (Whether a particular invocation
// contains the payload at type &str or String is unspecified and can change.)
let panic_message = match (e.downcast_ref::<&str>(), e.downcast_ref::<String>()) {
(Some(&s), _) => s,
(_, Some(s)) => s,
(None, None) => "<No panic info>",
};
panic!("{} (From: {:?})", panic_message, thread);
})
}

if let Some(scheduler_thread) = self.scheduler_thread.take() {
for thread in self.handler_threads.drain(..) {
debug!("joining...: {:?}", thread);
() = thread.join().unwrap();
() = join_with_panic_message(thread).unwrap();
}
() = scheduler_thread.join().unwrap();
() = join_with_panic_message(scheduler_thread).unwrap();

if should_receive_session_result {
let result_with_timings = self.session_result_receiver.recv().unwrap();
Expand Down Expand Up @@ -1291,6 +1335,7 @@ mod tests {
AfterSchedulerThreadAborted,
AfterTrashedSchedulerCleaned,
BeforeThreadManagerDrop,
BeforeEndSession,
}

#[test]
Expand Down Expand Up @@ -1744,6 +1789,88 @@ mod tests {
do_test_scheduler_schedule_execution_failure(false);
}

#[test]
#[should_panic(expected = "This panic should be propagated. (From: ")]
fn test_scheduler_schedule_execution_panic() {
solana_logger::setup();

#[derive(Debug)]
enum PanickingHanlderCheckPoint {
BeforeNotifiedPanic,
BeforeIgnoredPanic,
}

let progress = sleepless_testing::setup(&[
&TestCheckPoint::BeforeNewTask,
&CheckPoint::NewTask(0),
&PanickingHanlderCheckPoint::BeforeNotifiedPanic,
&CheckPoint::SchedulerThreadAborted,
&PanickingHanlderCheckPoint::BeforeIgnoredPanic,
&TestCheckPoint::BeforeEndSession,
]);

#[derive(Debug)]
struct PanickingHandler;
impl TaskHandler for PanickingHandler {
fn handle(
_result: &mut Result<()>,
_timings: &mut ExecuteTimings,
_bank: &Arc<Bank>,
_transaction: &SanitizedTransaction,
index: usize,
_handler_context: &HandlerContext,
) {
if index == 0 {
sleepless_testing::at(PanickingHanlderCheckPoint::BeforeNotifiedPanic);
} else if index == 1 {
sleepless_testing::at(PanickingHanlderCheckPoint::BeforeIgnoredPanic);
} else {
unreachable!();
}
panic!("This panic should be propagated.");
}
}

let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);

let bank = Bank::new_for_tests(&genesis_config);
let bank = setup_dummy_fork_graph(bank);
const TX_COUNT: usize = 2;
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = SchedulerPool::<PooledScheduler<PanickingHandler>, _>::new_dyn(
Some(TX_COUNT), // fix to use exactly 2 handlers
None,
None,
None,
ignored_prioritization_fee_cache,
);
let context = SchedulingContext::new(bank.clone());

let scheduler = pool.take_scheduler(context);

for index in 0..TX_COUNT {
// Use 2 non-conflicting txes to exercise the channel disconnected case as well.
let tx =
&SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
genesis_config.hash(),
));
scheduler.schedule_execution(&(tx, index)).unwrap();
}
// finally unblock the scheduler thread; otherwise the above schedule_execution could
// return SchedulerAborted...
sleepless_testing::at(TestCheckPoint::BeforeNewTask);

sleepless_testing::at(TestCheckPoint::BeforeEndSession);
let bank = BankWithScheduler::new(bank, Some(scheduler));

// the outer .unwrap() will panic. so, drop progress now.
drop(progress);
bank.wait_for_completed_scheduler().unwrap().0.unwrap();
}

#[test]
fn test_scheduler_execution_failure_short_circuiting() {
solana_logger::setup();
Expand Down

0 comments on commit 62520f2

Please sign in to comment.