From 27922d5107f355f6f880b8dfcb41b3dead1235d0 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 26 Apr 2024 13:44:52 +0900 Subject: [PATCH 1/2] Make unified scheduler's new task code fallible --- core/tests/unified_scheduler.rs | 4 +- ledger/src/blockstore_processor.rs | 82 ++++++++++++++---- runtime/src/installed_scheduler_pool.rs | 109 +++++++++++++++++++++--- unified-scheduler-pool/src/lib.rs | 49 ++++++++--- 4 files changed, 200 insertions(+), 44 deletions(-) diff --git a/core/tests/unified_scheduler.rs b/core/tests/unified_scheduler.rs index fae6f3cccfe698..22f9fab0c05122 100644 --- a/core/tests/unified_scheduler.rs +++ b/core/tests/unified_scheduler.rs @@ -107,7 +107,9 @@ fn test_scheduler_waited_by_drop_bank_service() { // Delay transaction execution to ensure transaction execution happens after termintion has // been started let lock_to_stall = LOCK_TO_STALL.lock().unwrap(); - pruned_bank.schedule_transaction_executions([(&tx, &0)].into_iter()); + pruned_bank + .schedule_transaction_executions([(&tx, &0)].into_iter()) + .unwrap(); drop(pruned_bank); assert_eq!(pool_raw.pooled_scheduler_count(), 0); drop(lock_to_stall); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index bc2a60efb2de87..cc49274ae88667 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -338,11 +338,29 @@ fn process_batches( "process_batches()/schedule_batches_for_execution({} batches)", batches.len() ); - // scheduling always succeeds here without being blocked on actual transaction executions. - // The transaction execution errors will be collected via the blocking fn called - // BankWithScheduler::wait_for_completed_scheduler(), if any. - schedule_batches_for_execution(bank, batches); - Ok(()) + // Scheduling usually succeeds (immediately returns `Ok(())`) here without being blocked on + // the actual transaction executions. + // + // As an exception, this code path could propagate the transaction execution _errors of + // previously-scheduled transactions_ to notify the replay stage. Then, the replay stage + // will bail out the further processing of the malformed (possibly malicious) block + // immediately, not to waste any system resources. Note that this propagation is of early + // hints. Even if errors won't be propagated in this way, they are guaranteed to be + // propagated eventually via the blocking fn called + // BankWithScheduler::wait_for_completed_scheduler(). + // + // To recite, the returned error is completely unrelated to the argument's `batches` at the + // hand. While being awkward, the _async_ unified scheduler is abusing this existing error + // propagation code path to the replay stage for compatibility and ease of integration, + // exploiting the fact that the replay stage doesn't care _which transaction the returned + // error is originating from_. + // + // In the future, more proper error propagation mechanism will be introduced once after we + // fully transition to the unified scheduler for the block verification. That one would be + // a push based one from the unified scheduler to the replay stage to eliminate the current + // overhead: 1 read lock per batch in + // `BankWithScheduler::schedule_transaction_executions()`. + schedule_batches_for_execution(bank, batches) } else { debug!( "process_batches()/rebatch_and_execute_batches({} batches)", @@ -364,7 +382,7 @@ fn process_batches( fn schedule_batches_for_execution( bank: &BankWithScheduler, batches: &[TransactionBatchWithIndexes], -) { +) -> Result<()> { for TransactionBatchWithIndexes { batch, transaction_indexes, @@ -375,8 +393,9 @@ fn schedule_batches_for_execution( .sanitized_transactions() .iter() .zip(transaction_indexes.iter()), - ); + )?; } + Ok(()) } fn rebatch_transactions<'a>( @@ -2139,7 +2158,8 @@ pub mod tests { self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, }, installed_scheduler_pool::{ - MockInstalledScheduler, MockUninstalledScheduler, SchedulingContext, + MockInstalledScheduler, MockUninstalledScheduler, SchedulerAborted, + SchedulingContext, }, }, solana_sdk::{ @@ -4741,8 +4761,7 @@ pub mod tests { assert_eq!(batch3.transaction_indexes, vec![43, 44]); } - #[test] - fn test_schedule_batches_for_execution() { + fn do_test_schedule_batches_for_execution(should_succeed: bool) { solana_logger::setup(); let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); let GenesisConfigInfo { @@ -4763,10 +4782,23 @@ pub mod tests { .times(1) .in_sequence(&mut seq.lock().unwrap()) .return_const(context); - mocked_scheduler - .expect_schedule_execution() - .times(txs.len()) - .returning(|_| ()); + if should_succeed { + mocked_scheduler + .expect_schedule_execution() + .times(txs.len()) + .returning(|(_, _)| Ok(())); + } else { + // mocked_scheduler isn't async; so short-circuiting behavior is quite visible in that + // .times(1) is called instead of .times(txs.len()), not like the succeeding case + mocked_scheduler + .expect_schedule_execution() + .times(1) + .returning(|(_, _)| Err(SchedulerAborted)); + mocked_scheduler + .expect_recover_error_after_abort() + .times(1) + .returning(|| TransactionError::InsufficientFundsForFee); + } mocked_scheduler .expect_wait_for_termination() .with(mockall::predicate::eq(true)) @@ -4795,7 +4827,7 @@ pub mod tests { let replay_tx_thread_pool = create_thread_pool(1); let mut batch_execution_timing = BatchExecutionTiming::default(); let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); - assert!(process_batches( + let result = process_batches( &bank, &replay_tx_thread_pool, &[batch_with_indexes], @@ -4803,9 +4835,23 @@ pub mod tests { None, &mut batch_execution_timing, None, - &ignored_prioritization_fee_cache - ) - .is_ok()); + &ignored_prioritization_fee_cache, + ); + if should_succeed { + assert_matches!(result, Ok(())); + } else { + assert_matches!(result, Err(TransactionError::InsufficientFundsForFee)); + } + } + + #[test] + fn test_schedule_batches_for_execution_success() { + do_test_schedule_batches_for_execution(true); + } + + #[test] + fn test_schedule_batches_for_execution_failure() { + do_test_schedule_batches_for_execution(false); } #[test] diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index aaf3ea98f1b9aa..cfbb46f30c38d2 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -27,7 +27,7 @@ use { solana_sdk::{ hash::Hash, slot_history::Slot, - transaction::{Result, SanitizedTransaction}, + transaction::{Result, SanitizedTransaction, TransactionError}, }, std::{ fmt::Debug, @@ -42,6 +42,10 @@ pub trait InstalledSchedulerPool: Send + Sync + Debug { fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox; } +#[derive(Debug)] +pub struct SchedulerAborted; +pub type ScheduleResult = std::result::Result<(), SchedulerAborted>; + #[cfg_attr(doc, aquamarine::aquamarine)] /// Schedules, executes, and commits transactions under encapsulated implementation /// @@ -101,17 +105,50 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static { fn id(&self) -> SchedulerId; fn context(&self) -> &SchedulingContext; - // Calling this is illegal as soon as wait_for_termination is called. + /// Schedule transaction for execution. + /// + /// This non-blocking function will return immediately without waiting for actual execution. + /// + /// Calling this is illegal as soon as `wait_for_termination()` is called. It would result in + /// fatal logic error. + /// + /// Note that the returned result indicates whether the scheduler has been aborted due to a + /// previously-scheduled bad transaction, which terminates further block verification. So, + /// almost always, the returned error isn't due to the merely scheduling of the current + /// transaction itself. At this point, calling this does nothing anymore while it's still safe + /// to do. As soon as notified, callers is expected to stop processing upcoming transactions of + /// the same `SchedulingContext` (i.e. same block). Internally, the aborted scheduler will be + /// disposed cleanly, not repooled, after `wait_for_termination()` is called, much like + /// not-aborted schedulers. + /// + /// Caller can acquire the error by calling a separate function called + /// `recover_error_after_abort()`, which requires `&mut self`, instead of `&self`. This + /// separation and convoluted returned value semantics explained above are intentional to + /// optimize the fast code-path of normal transaction scheduling to be multi-threaded at the + /// cost of far slower error code-path. fn schedule_execution<'a>( &'a self, transaction_with_index: &'a (&'a SanitizedTransaction, usize), - ); + ) -> ScheduleResult; + + /// Return the error which caused the scheduler to abort. + /// + /// Note that this must not be called until it's observed that `schedule_execution()` has + /// returned `Err(SchedulerAborted)`. Violating this will `panic!()`. + /// + /// That said, calling this multiple times is completely acceptable after the error observation + /// from `schedule_execution()`. While it's not guaranteed, the same `.clone()`-ed errors of + /// the first bad transaction are usually returned across invocations. + fn recover_error_after_abort(&mut self) -> TransactionError; /// Wait for a scheduler to terminate after processing. /// /// This function blocks the current thread while waiting for the scheduler to complete all of /// the executions for the scheduled transactions and to return the finalized - /// `ResultWithTimings`. Along with the result, this function also makes the scheduler itself + /// `ResultWithTimings`. This function still blocks for short period of time even in the case + /// of aborted schedulers to gracefully shutdown the scheduler (like thread joining). + /// + /// Along with the result being returned, this function also makes the scheduler itself /// uninstalled from the bank by transforming the consumed self. /// /// If no transaction is scheduled, the result and timing will be `Ok(())` and @@ -286,11 +323,15 @@ impl BankWithScheduler { self.inner.scheduler.read().unwrap().is_some() } + /// Schedule the transaction as long as the scheduler hasn't been aborted. + /// + /// If the scheduler has been aborted, this doesn't schedule the transaction, instead just + /// return the error of prior scheduled transaction. // 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet... pub fn schedule_transaction_executions<'a>( &self, transactions_with_indexes: impl ExactSizeIterator, - ) { + ) -> Result<()> { trace!( "schedule_transaction_executions(): {} txs", transactions_with_indexes.len() @@ -300,8 +341,25 @@ impl BankWithScheduler { let scheduler = scheduler_guard.as_ref().unwrap(); for (sanitized_transaction, &index) in transactions_with_indexes { - scheduler.schedule_execution(&(sanitized_transaction, index)); + if scheduler + .schedule_execution(&(sanitized_transaction, index)) + .is_err() + { + drop(scheduler_guard); + // This write lock isn't atomic with the above the read lock. So, another thread + // could have called .recover_error_after_abort() while we're literally stuck at + // the gaps of these locks (i.e. this comment in source code wise) under extreme + // race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that + // consideration in mind. + // + // Lastly, this non-atomic nature is intentional for optimizing the fast code-path + let mut scheduler_guard = self.inner.scheduler.write().unwrap(); + let scheduler = scheduler_guard.as_mut().unwrap(); + return Err(scheduler.recover_error_after_abort()); + } } + + Ok(()) } // take needless &mut only to communicate its semantic mutability to humans... @@ -550,8 +608,7 @@ mod tests { assert_matches!(bank.wait_for_completed_scheduler(), Some(_)); } - #[test] - fn test_schedule_executions() { + fn do_test_schedule_execution(should_succeed: bool) { solana_logger::setup(); let GenesisConfigInfo { @@ -570,14 +627,40 @@ mod tests { bank.clone(), [true].into_iter(), Some(|mocked: &mut MockInstalledScheduler| { - mocked - .expect_schedule_execution() - .times(1) - .returning(|(_, _)| ()); + if should_succeed { + mocked + .expect_schedule_execution() + .times(1) + .returning(|(_, _)| Ok(())); + } else { + mocked + .expect_schedule_execution() + .times(1) + .returning(|(_, _)| Err(SchedulerAborted)); + mocked + .expect_recover_error_after_abort() + .times(1) + .returning(|| TransactionError::InsufficientFundsForFee); + } }), ); let bank = BankWithScheduler::new(bank, Some(mocked_scheduler)); - bank.schedule_transaction_executions([(&tx0, &0)].into_iter()); + let result = bank.schedule_transaction_executions([(&tx0, &0)].into_iter()); + if should_succeed { + assert_matches!(result, Ok(())); + } else { + assert_matches!(result, Err(TransactionError::InsufficientFundsForFee)); + } + } + + #[test] + fn test_schedule_execution_success() { + do_test_schedule_execution(true); + } + + #[test] + fn test_schedule_execution_failure() { + do_test_schedule_execution(false); } } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 0b7c5495b0accc..bffadcb8e50580 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -29,14 +29,14 @@ use { bank::Bank, installed_scheduler_pool::{ InstalledScheduler, InstalledSchedulerBox, InstalledSchedulerPool, - InstalledSchedulerPoolArc, ResultWithTimings, SchedulerId, SchedulingContext, - UninstalledScheduler, UninstalledSchedulerBox, + InstalledSchedulerPoolArc, ResultWithTimings, ScheduleResult, SchedulerId, + SchedulingContext, UninstalledScheduler, UninstalledSchedulerBox, }, prioritization_fee_cache::PrioritizationFeeCache, }, solana_sdk::{ pubkey::Pubkey, - transaction::{Result, SanitizedTransaction}, + transaction::{Result, SanitizedTransaction, TransactionError}, }, solana_unified_scheduler_logic::{SchedulingStateMachine, Task, UsageQueue}, solana_vote::vote_sender_types::ReplayVoteSender, @@ -955,11 +955,20 @@ impl InstalledScheduler for PooledScheduler { &self.context } - fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { + fn schedule_execution( + &self, + &(transaction, index): &(&SanitizedTransaction, usize), + ) -> ScheduleResult { let task = SchedulingStateMachine::create_task(transaction.clone(), index, &mut |pubkey| { self.inner.usage_queue_loader.load(pubkey) }); self.inner.thread_manager.send_task(task); + // Return hard-coded Ok for now; Upcoming pr will implement this properly. + Ok(()) + } + + fn recover_error_after_abort(&mut self) -> TransactionError { + todo!("in later pr..."); } fn wait_for_termination( @@ -1186,7 +1195,7 @@ mod tests { assert_eq!(bank.transaction_count(), 0); let scheduler = pool.take_scheduler(context); - scheduler.schedule_execution(&(tx0, 0)); + scheduler.schedule_execution(&(tx0, 0)).unwrap(); let bank = BankWithScheduler::new(bank, Some(scheduler)); assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); assert_eq!(bank.transaction_count(), 1); @@ -1219,7 +1228,7 @@ mod tests { genesis_config.hash(), )); assert_eq!(bank.transaction_count(), 0); - scheduler.schedule_execution(&(bad_tx, 0)); + scheduler.schedule_execution(&(bad_tx, 0)).unwrap(); // simulate the task-sending thread is stalled for some reason. std::thread::sleep(std::time::Duration::from_secs(1)); assert_eq!(bank.transaction_count(), 0); @@ -1237,7 +1246,9 @@ mod tests { .result, Ok(_) ); - scheduler.schedule_execution(&(good_tx_after_bad_tx, 1)); + scheduler + .schedule_execution(&(good_tx_after_bad_tx, 1)) + .unwrap(); scheduler.pause_for_recent_blockhash(); // transaction_count should remain same as scheduler should be bailing out. // That's because we're testing the serialized failing execution case in this test. @@ -1330,8 +1341,12 @@ mod tests { // Stall handling tx0 and tx1 let lock_to_stall = LOCK_TO_STALL.lock().unwrap(); - scheduler.schedule_execution(&(tx0, STALLED_TRANSACTION_INDEX)); - scheduler.schedule_execution(&(tx1, BLOCKED_TRANSACTION_INDEX)); + scheduler + .schedule_execution(&(tx0, STALLED_TRANSACTION_INDEX)) + .unwrap(); + scheduler + .schedule_execution(&(tx1, BLOCKED_TRANSACTION_INDEX)) + .unwrap(); // Wait a bit for the scheduler thread to decide to block tx1 std::thread::sleep(std::time::Duration::from_secs(1)); @@ -1405,7 +1420,7 @@ mod tests { .take(10000) { let scheduler = pool.take_scheduler(context.clone()); - scheduler.schedule_execution(&(dummy_tx, index)); + scheduler.schedule_execution(&(dummy_tx, index)).unwrap(); scheduler.wait_for_termination(false).1.return_to_pool(); } } @@ -1445,7 +1460,10 @@ mod tests { &self.2 } - fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { + fn schedule_execution( + &self, + &(transaction, index): &(&SanitizedTransaction, usize), + ) -> ScheduleResult { let transaction_and_index = (transaction.clone(), index); let context = self.context().clone(); let pool = self.3.clone(); @@ -1468,6 +1486,12 @@ mod tests { ); (result, timings) })); + + Ok(()) + } + + fn recover_error_after_abort(&mut self) -> TransactionError { + unimplemented!(); } fn wait_for_termination( @@ -1573,7 +1597,8 @@ mod tests { assert_eq!(bank.transaction_count(), 0); // schedule but not immediately execute transaction - bank.schedule_transaction_executions([(&very_old_valid_tx, &0)].into_iter()); + bank.schedule_transaction_executions([(&very_old_valid_tx, &0)].into_iter()) + .unwrap(); // this calls register_recent_blockhash internally bank.fill_bank_with_ticks_for_tests(); From d853f8fc21baa8a99d63cefb967904ccb95be711 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 9 May 2024 14:55:39 +0900 Subject: [PATCH 2/2] Fix typo and improve wording --- runtime/src/installed_scheduler_pool.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index cfbb46f30c38d2..f4371b0efd9284 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -116,16 +116,17 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static { /// previously-scheduled bad transaction, which terminates further block verification. So, /// almost always, the returned error isn't due to the merely scheduling of the current /// transaction itself. At this point, calling this does nothing anymore while it's still safe - /// to do. As soon as notified, callers is expected to stop processing upcoming transactions of - /// the same `SchedulingContext` (i.e. same block). Internally, the aborted scheduler will be - /// disposed cleanly, not repooled, after `wait_for_termination()` is called, much like + /// to do. As soon as notified, callers are expected to stop processing upcoming transactions + /// of the same `SchedulingContext` (i.e. same block). Internally, the aborted scheduler will + /// be disposed cleanly, not repooled, after `wait_for_termination()` is called like /// not-aborted schedulers. /// /// Caller can acquire the error by calling a separate function called /// `recover_error_after_abort()`, which requires `&mut self`, instead of `&self`. This - /// separation and convoluted returned value semantics explained above are intentional to + /// separation and the convoluted returned value semantics explained above are intentional to /// optimize the fast code-path of normal transaction scheduling to be multi-threaded at the - /// cost of far slower error code-path. + /// cost of far slower error code-path while giving implementors increased flexibility by + /// having &mut. fn schedule_execution<'a>( &'a self, transaction_with_index: &'a (&'a SanitizedTransaction, usize),