diff --git a/Cargo.lock b/Cargo.lock index 96bbc6f6c980ad..a9b06c948b6294 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7556,6 +7556,7 @@ dependencies = [ "crossbeam-channel", "dashmap", "derivative", + "lazy_static", "log", "qualifier_attr", "solana-ledger", diff --git a/core/tests/unified_scheduler.rs b/core/tests/unified_scheduler.rs index fae6f3cccfe698..22f9fab0c05122 100644 --- a/core/tests/unified_scheduler.rs +++ b/core/tests/unified_scheduler.rs @@ -107,7 +107,9 @@ fn test_scheduler_waited_by_drop_bank_service() { // Delay transaction execution to ensure transaction execution happens after termintion has // been started let lock_to_stall = LOCK_TO_STALL.lock().unwrap(); - pruned_bank.schedule_transaction_executions([(&tx, &0)].into_iter()); + pruned_bank + .schedule_transaction_executions([(&tx, &0)].into_iter()) + .unwrap(); drop(pruned_bank); assert_eq!(pool_raw.pooled_scheduler_count(), 0); drop(lock_to_stall); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index d6a4d183a01878..ebaa7a2c65075a 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -338,11 +338,29 @@ fn process_batches( "process_batches()/schedule_batches_for_execution({} batches)", batches.len() ); - // scheduling always succeeds here without being blocked on actual transaction executions. - // The transaction execution errors will be collected via the blocking fn called - // BankWithScheduler::wait_for_completed_scheduler(), if any. - schedule_batches_for_execution(bank, batches); - Ok(()) + // Scheduling usually succeeds (immediately returns `Ok(())`) here without being blocked on + // the actual transaction executions. + // + // As an exception, this code path could propagate the transaction execution _errors of + // previously-scheduled transactions_ to notify the replay stage. Then, the replay stage + // will bail out the further processing of the malformed (possibly malicious) block + // immediately, not to waste any system resources. Note that this propagation is of early + // hints. Even if errors won't be propagated in this way, they are guaranteed to be + // propagated eventually via the blocking fn called + // BankWithScheduler::wait_for_completed_scheduler(). + // + // To recite, the returned error is completely unrelated to the argument's `batches` at the + // hand. While being awkward, the _async_ unified scheduler is abusing this existing error + // propagation code path to the replay stage for compatibility and ease of integration, + // exploiting the fact that the replay stage doesn't care _which transaction the returned + // error is originating from_. + // + // In the future, more proper error propagation mechanism will be introduced once after we + // fully transition to the unified scheduler for the block verification. That one would be + // a push based one from the unified scheduler to the replay stage to eliminate the current + // overhead: 1 read lock per batch in + // `BankWithScheduler::schedule_transaction_executions()`. + schedule_batches_for_execution(bank, batches) } else { debug!( "process_batches()/rebatch_and_execute_batches({} batches)", @@ -364,7 +382,7 @@ fn process_batches( fn schedule_batches_for_execution( bank: &BankWithScheduler, batches: &[TransactionBatchWithIndexes], -) { +) -> Result<()> { for TransactionBatchWithIndexes { batch, transaction_indexes, @@ -375,8 +393,9 @@ fn schedule_batches_for_execution( .sanitized_transactions() .iter() .zip(transaction_indexes.iter()), - ); + )?; } + Ok(()) } fn rebatch_transactions<'a>( @@ -2138,7 +2157,8 @@ pub mod tests { self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, }, installed_scheduler_pool::{ - MockInstalledScheduler, MockUninstalledScheduler, SchedulingContext, + MockInstalledScheduler, MockUninstalledScheduler, SchedulerAborted, + SchedulingContext, }, }, solana_sdk::{ @@ -4740,8 +4760,7 @@ pub mod tests { assert_eq!(batch3.transaction_indexes, vec![43, 44]); } - #[test] - fn test_schedule_batches_for_execution() { + fn do_test_schedule_batches_for_execution(should_succeed: bool) { solana_logger::setup(); let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); let GenesisConfigInfo { @@ -4762,10 +4781,23 @@ pub mod tests { .times(1) .in_sequence(&mut seq.lock().unwrap()) .return_const(context); - mocked_scheduler - .expect_schedule_execution() - .times(txs.len()) - .returning(|_| ()); + if should_succeed { + mocked_scheduler + .expect_schedule_execution() + .times(txs.len()) + .returning(|(_, _)| Ok(())); + } else { + // mocked_scheduler isn't async; so short-circuiting behavior is quite visible in that + // .times(1) is called instead of .times(txs.len()), not like the succeeding case + mocked_scheduler + .expect_schedule_execution() + .times(1) + .returning(|(_, _)| Err(SchedulerAborted)); + mocked_scheduler + .expect_recover_error_after_abort() + .times(1) + .returning(|| TransactionError::InsufficientFundsForFee); + } mocked_scheduler .expect_wait_for_termination() .with(mockall::predicate::eq(true)) @@ -4794,7 +4826,7 @@ pub mod tests { let replay_tx_thread_pool = create_thread_pool(1); let mut batch_execution_timing = BatchExecutionTiming::default(); let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); - assert!(process_batches( + let result = process_batches( &bank, &replay_tx_thread_pool, &[batch_with_indexes], @@ -4802,9 +4834,23 @@ pub mod tests { None, &mut batch_execution_timing, None, - &ignored_prioritization_fee_cache - ) - .is_ok()); + &ignored_prioritization_fee_cache, + ); + if should_succeed { + assert_matches!(result, Ok(())); + } else { + assert_matches!(result, Err(TransactionError::InsufficientFundsForFee)); + } + } + + #[test] + fn test_schedule_batches_for_execution_success() { + do_test_schedule_batches_for_execution(true); + } + + #[test] + fn test_schedule_batches_for_execution_failure() { + do_test_schedule_batches_for_execution(false); } #[test] diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index aaf3ea98f1b9aa..c30d8fc7596ccd 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -27,7 +27,7 @@ use { solana_sdk::{ hash::Hash, slot_history::Slot, - transaction::{Result, SanitizedTransaction}, + transaction::{Result, SanitizedTransaction, TransactionError}, }, std::{ fmt::Debug, @@ -42,6 +42,10 @@ pub trait InstalledSchedulerPool: Send + Sync + Debug { fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox; } +#[derive(Debug)] +pub struct SchedulerAborted; +pub type ScheduleResult = std::result::Result<(), SchedulerAborted>; + #[cfg_attr(doc, aquamarine::aquamarine)] /// Schedules, executes, and commits transactions under encapsulated implementation /// @@ -101,17 +105,51 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static { fn id(&self) -> SchedulerId; fn context(&self) -> &SchedulingContext; - // Calling this is illegal as soon as wait_for_termination is called. + /// Schedule transaction for execution. + /// + /// This non-blocking function will return immediately without waiting for actual execution. + /// + /// Calling this is illegal as soon as `wait_for_termination()` is called. It would result in + /// fatal logic error. + /// + /// Note that the returned result indicates whether the scheduler has been aborted due to a + /// previously-scheduled bad transaction, which terminates further block verification. So, + /// almost always, the returned error isn't due to the merely scheduling of the current + /// transaction itself. At this point, calling this does nothing anymore while it's still safe + /// to do. As soon as notified, callers are expected to stop processing upcoming transactions + /// of the same `SchedulingContext` (i.e. same block). Internally, the aborted scheduler will + /// be disposed cleanly, not repooled, after `wait_for_termination()` is called like + /// not-aborted schedulers. + /// + /// Caller can acquire the error by calling a separate function called + /// `recover_error_after_abort()`, which requires `&mut self`, instead of `&self`. This + /// separation and the convoluted returned value semantics explained above are intentional to + /// optimize the fast code-path of normal transaction scheduling to be multi-threaded at the + /// cost of far slower error code-path while giving implementors increased flexibility by + /// having &mut. fn schedule_execution<'a>( &'a self, transaction_with_index: &'a (&'a SanitizedTransaction, usize), - ); + ) -> ScheduleResult; + + /// Return the error which caused the scheduler to abort. + /// + /// Note that this must not be called until it's observed that `schedule_execution()` has + /// returned `Err(SchedulerAborted)`. Violating this should `panic!()`. + /// + /// That said, calling this multiple times is completely acceptable after the error observation + /// from `schedule_execution()`. While it's not guaranteed, the same `.clone()`-ed errors of + /// the first bad transaction are usually returned across invocations. + fn recover_error_after_abort(&mut self) -> TransactionError; /// Wait for a scheduler to terminate after processing. /// /// This function blocks the current thread while waiting for the scheduler to complete all of /// the executions for the scheduled transactions and to return the finalized - /// `ResultWithTimings`. Along with the result, this function also makes the scheduler itself + /// `ResultWithTimings`. This function still blocks for short period of time even in the case + /// of aborted schedulers to gracefully shutdown the scheduler (like thread joining). + /// + /// Along with the result being returned, this function also makes the scheduler itself /// uninstalled from the bank by transforming the consumed self. /// /// If no transaction is scheduled, the result and timing will be `Ok(())` and @@ -286,11 +324,15 @@ impl BankWithScheduler { self.inner.scheduler.read().unwrap().is_some() } + /// Schedule the transaction as long as the scheduler hasn't been aborted. + /// + /// If the scheduler has been aborted, this doesn't schedule the transaction, instead just + /// return the error of prior scheduled transaction. // 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet... pub fn schedule_transaction_executions<'a>( &self, transactions_with_indexes: impl ExactSizeIterator, - ) { + ) -> Result<()> { trace!( "schedule_transaction_executions(): {} txs", transactions_with_indexes.len() @@ -300,8 +342,25 @@ impl BankWithScheduler { let scheduler = scheduler_guard.as_ref().unwrap(); for (sanitized_transaction, &index) in transactions_with_indexes { - scheduler.schedule_execution(&(sanitized_transaction, index)); + if scheduler + .schedule_execution(&(sanitized_transaction, index)) + .is_err() + { + drop(scheduler_guard); + // This write lock isn't atomic with the above the read lock. So, another thread + // could have called .recover_error_after_abort() while we're literally stuck at + // the gaps of these locks (i.e. this comment in source code wise) under extreme + // race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that + // consideration in mind. + // + // Lastly, this non-atomic nature is intentional for optimizing the fast code-path + let mut scheduler_guard = self.inner.scheduler.write().unwrap(); + let scheduler = scheduler_guard.as_mut().unwrap(); + return Err(scheduler.recover_error_after_abort()); + } } + + Ok(()) } // take needless &mut only to communicate its semantic mutability to humans... @@ -550,8 +609,7 @@ mod tests { assert_matches!(bank.wait_for_completed_scheduler(), Some(_)); } - #[test] - fn test_schedule_executions() { + fn do_test_schedule_execution(should_succeed: bool) { solana_logger::setup(); let GenesisConfigInfo { @@ -570,14 +628,40 @@ mod tests { bank.clone(), [true].into_iter(), Some(|mocked: &mut MockInstalledScheduler| { - mocked - .expect_schedule_execution() - .times(1) - .returning(|(_, _)| ()); + if should_succeed { + mocked + .expect_schedule_execution() + .times(1) + .returning(|(_, _)| Ok(())); + } else { + mocked + .expect_schedule_execution() + .times(1) + .returning(|(_, _)| Err(SchedulerAborted)); + mocked + .expect_recover_error_after_abort() + .times(1) + .returning(|| TransactionError::InsufficientFundsForFee); + } }), ); let bank = BankWithScheduler::new(bank, Some(mocked_scheduler)); - bank.schedule_transaction_executions([(&tx0, &0)].into_iter()); + let result = bank.schedule_transaction_executions([(&tx0, &0)].into_iter()); + if should_succeed { + assert_matches!(result, Ok(())); + } else { + assert_matches!(result, Err(TransactionError::InsufficientFundsForFee)); + } + } + + #[test] + fn test_schedule_execution_success() { + do_test_schedule_execution(true); + } + + #[test] + fn test_schedule_execution_failure() { + do_test_schedule_execution(false); } } diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 1d57a9307f7a47..cc6b3942e1f951 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -25,6 +25,7 @@ solana-vote = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } +lazy_static = { workspace = true } solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 55b90bff51ea11..15e56cfc488a90 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -29,28 +29,43 @@ use { bank::Bank, installed_scheduler_pool::{ InstalledScheduler, InstalledSchedulerBox, InstalledSchedulerPool, - InstalledSchedulerPoolArc, ResultWithTimings, SchedulerId, SchedulingContext, - UninstalledScheduler, UninstalledSchedulerBox, + InstalledSchedulerPoolArc, ResultWithTimings, ScheduleResult, SchedulerAborted, + SchedulerId, SchedulingContext, UninstalledScheduler, UninstalledSchedulerBox, }, prioritization_fee_cache::PrioritizationFeeCache, }, solana_sdk::{ pubkey::Pubkey, - transaction::{Result, SanitizedTransaction}, + transaction::{Result, SanitizedTransaction, TransactionError}, }, solana_unified_scheduler_logic::{SchedulingStateMachine, Task, UsageQueue}, solana_vote::vote_sender_types::ReplayVoteSender, std::{ fmt::Debug, marker::PhantomData, + mem, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, Arc, Mutex, OnceLock, Weak, }, - thread::{self, JoinHandle}, + thread::{self, sleep, JoinHandle}, + time::Duration, }, }; +mod sleepless_testing; +use crate::sleepless_testing::BuilderTracked; + +// dead_code is false positive; these tuple fields are used via Debug. +#[allow(dead_code)] +#[derive(Debug)] +enum CheckPoint { + NewTask(usize), + TaskHandled(usize), + SchedulerThreadAborted, + TrashedSchedulerCleaned(usize), +} + type AtomicSchedulerId = AtomicU64; // SchedulerPool must be accessed as a dyn trait from solana-runtime, because SchedulerPool @@ -59,6 +74,7 @@ type AtomicSchedulerId = AtomicU64; #[derive(Debug)] pub struct SchedulerPool, TH: TaskHandler> { scheduler_inners: Mutex>, + trashed_scheduler_inners: Mutex>, handler_count: usize, handler_context: HandlerContext, // weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to @@ -87,6 +103,8 @@ pub struct HandlerContext { pub type DefaultSchedulerPool = SchedulerPool, DefaultTaskHandler>; +const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10); + impl SchedulerPool where S: SpawnableScheduler, @@ -101,12 +119,31 @@ where transaction_status_sender: Option, replay_vote_sender: Option, prioritization_fee_cache: Arc, + ) -> Arc { + Self::do_new( + handler_count, + log_messages_bytes_limit, + transaction_status_sender, + replay_vote_sender, + prioritization_fee_cache, + DEFAULT_POOL_CLEANER_INTERVAL, + ) + } + + fn do_new( + handler_count: Option, + log_messages_bytes_limit: Option, + transaction_status_sender: Option, + replay_vote_sender: Option, + prioritization_fee_cache: Arc, + pool_cleaner_interval: Duration, ) -> Arc { let handler_count = handler_count.unwrap_or(Self::default_handler_count()); assert!(handler_count >= 1); - Arc::new_cyclic(|weak_self| Self { + let scheduler_pool = Arc::new_cyclic(|weak_self| Self { scheduler_inners: Mutex::default(), + trashed_scheduler_inners: Mutex::default(), handler_count, handler_context: HandlerContext { log_messages_bytes_limit, @@ -117,7 +154,47 @@ where weak_self: weak_self.clone(), next_scheduler_id: AtomicSchedulerId::default(), _phantom: PhantomData, - }) + }); + + let cleaner_main_loop = { + let weak_scheduler_pool = Arc::downgrade(&scheduler_pool); + + move || loop { + sleep(pool_cleaner_interval); + + let Some(scheduler_pool) = weak_scheduler_pool.upgrade() else { + break; + }; + + let trashed_inner_count = { + let Ok(mut trashed_scheduler_inners) = + scheduler_pool.trashed_scheduler_inners.lock() + else { + break; + }; + let trashed_inners: Vec<_> = mem::take(&mut *trashed_scheduler_inners); + drop(trashed_scheduler_inners); + + let trashed_inner_count = trashed_inners.len(); + drop(trashed_inners); + trashed_inner_count + }; + + info!( + "Scheduler pool cleaner: dropped {} trashed inners", + trashed_inner_count, + ); + sleepless_testing::at(CheckPoint::TrashedSchedulerCleaned(trashed_inner_count)); + } + }; + + // No need to join; the spawned main loop will gracefully exit. + thread::Builder::new() + .name("solScCleaner".to_owned()) + .spawn_tracked(cleaner_main_loop) + .unwrap(); + + scheduler_pool } // This apparently-meaningless wrapper is handy, because some callers explicitly want @@ -149,11 +226,24 @@ where self.next_scheduler_id.fetch_add(1, Relaxed) } - fn return_scheduler(&self, scheduler: S::Inner) { - self.scheduler_inners - .lock() - .expect("not poisoned") - .push(scheduler); + // This fn needs to return immediately due to being part of the blocking + // `::wait_for_termination()` call. + fn return_scheduler(&self, scheduler: S::Inner, should_trash: bool) { + if should_trash { + // Delay drop()-ing this trashed returned scheduler inner by stashing it in + // self.trashed_scheduler_inners, which is periodically drained by the `solScCleaner` + // thread. Dropping it could take long time (in fact, + // PooledSchedulerInner::usage_queue_loader can contain many entries to drop). + self.trashed_scheduler_inners + .lock() + .expect("not poisoned") + .push(scheduler); + } else { + self.scheduler_inners + .lock() + .expect("not poisoned") + .push(scheduler); + } } fn do_take_scheduler(&self, context: SchedulingContext) -> S { @@ -253,6 +343,7 @@ impl TaskHandler for DefaultTaskHandler { handler_context.log_messages_bytes_limit, &handler_context.prioritization_fee_cache, ); + sleepless_testing::at(CheckPoint::TaskHandled(index)); } } @@ -407,6 +498,10 @@ mod chained_channel { &self.aux_receiver } + pub(super) fn never_receive_from_aux(&mut self) { + self.aux_receiver = never(); + } + pub(super) fn after_select(&mut self, message: ChainedChannel) -> Option

{ match message.0 { ChainedChannelPrivate::Payload(payload) => Some(payload), @@ -473,6 +568,71 @@ pub struct PooledSchedulerInner, TH: TaskHandler> { usage_queue_loader: UsageQueueLoader, } +impl Drop for ThreadManager +where + S: SpawnableScheduler, + TH: TaskHandler, +{ + fn drop(&mut self) { + trace!("ThreadManager::drop() is called..."); + + if self.are_threads_joined() { + return; + } + // If on-stack ThreadManager is being dropped abruptly while panicking, it's likely + // ::into_inner() isn't called, which is a critical runtime invariant for the following + // thread shutdown. Also, the state could be corrupt in other ways too, so just skip it + // altogether. + if thread::panicking() { + error!( + "ThreadManager::drop(): scheduler_id: {} skipping due to already panicking...", + self.scheduler_id, + ); + return; + } + + // assert that this is called after ::into_inner() + assert_matches!(self.session_result_with_timings, None); + + // Ensure to initiate thread shutdown via disconnected new_task_receiver by replacing the + // current new_task_sender with a random one... + self.new_task_sender = crossbeam_channel::unbounded().0; + + self.ensure_join_threads(true); + assert_matches!(self.session_result_with_timings, Some((Ok(_), _))); + } +} + +impl PooledSchedulerInner +where + S: SpawnableScheduler, + TH: TaskHandler, +{ + fn id(&self) -> SchedulerId { + self.thread_manager.scheduler_id + } + + fn is_trashed(&self) -> bool { + // Schedulers can be regarded as being _trashed_ (thereby will be cleaned up later), if + // threads are joined. Remember that unified scheduler _doesn't normally join threads_ even + // across different sessions (i.e. different banks) to avoid thread recreation overhead. + // + // These unusual thread joining happens after the blocked thread (= the replay stage)'s + // detection of aborted scheduler thread, which can be interpreted as an immediate signal + // about the existence of the transaction error. + // + // Note that this detection is done internally every time scheduler operations are run + // (send_task() and end_session(); or schedule_execution() and wait_for_termination() in + // terms of InstalledScheduler). So, it's ensured that the detection is done at least once + // for any scheudler which is taken out of the pool. + // + // Thus, any transaction errors are always handled without loss of information and + // the aborted scheduler itself will always be handled as _trashed_ before returning the + // scheduler to the pool, considering is_trashed() is checked immediately before that. + self.thread_manager.are_threads_joined() + } +} + // This type manages the OS threads for scheduling and executing transactions. The term // `session` is consistently used to mean a group of Tasks scoped under a single SchedulingContext. // This is equivalent to a particular bank for block verification. However, new terms is introduced @@ -483,7 +643,7 @@ struct ThreadManager, TH: TaskHandler> { scheduler_id: SchedulerId, pool: Arc>, new_task_sender: Sender, - new_task_receiver: Receiver, + new_task_receiver: Option>, session_result_sender: Sender, session_result_receiver: Receiver, session_result_with_timings: Option, @@ -513,7 +673,7 @@ impl, TH: TaskHandler> ThreadManager { scheduler_id: pool.new_scheduler_id(), pool, new_task_sender, - new_task_receiver, + new_task_receiver: Some(new_task_receiver), session_result_sender, session_result_receiver, session_result_with_timings: None, @@ -538,20 +698,18 @@ impl, TH: TaskHandler> ThreadManager { ); } + #[must_use] fn accumulate_result_with_timings( (result, timings): &mut ResultWithTimings, executed_task: Box, - ) { + ) -> Option> { timings.accumulate(&executed_task.result_with_timings.1); match executed_task.result_with_timings.0 { - Ok(()) => {} + Ok(()) => Some(executed_task), Err(error) => { error!("error is detected while accumulating....: {error:?}"); - // Override errors intentionally for simplicity, not retaining the - // first error unlike the block verification in the - // blockstore_processor. This will be addressed with more - // full-fledged impl later. *result = Err(error); + None } } } @@ -668,7 +826,13 @@ impl, TH: TaskHandler> ThreadManager { let scheduler_main_loop = || { let handler_count = self.pool.handler_count; let session_result_sender = self.session_result_sender.clone(); - let new_task_receiver = self.new_task_receiver.clone(); + // Taking new_task_receiver here is important to ensure there's a single receiver. In + // this way, the replay stage will get .send() failures reliably, after this scheduler + // thread died along with the single receiver. + let new_task_receiver = self + .new_task_receiver + .take() + .expect("no 2nd start_threads()"); let mut session_ending = false; @@ -725,7 +889,7 @@ impl, TH: TaskHandler> ThreadManager { }; let mut result_with_timings = initialized_result_with_timings(); - loop { + 'nonaborted_main_loop: loop { match new_task_receiver.recv() { Ok(NewTaskPayload::OpenSubchannel(context)) => { // signal about new SchedulingContext to handler threads @@ -733,9 +897,13 @@ impl, TH: TaskHandler> ThreadManager { .send_chained_channel(context, handler_count) .unwrap(); } - _ => { + Ok(_) => { unreachable!(); } + Err(_) => { + // This unusual condition must be triggered by ThreadManager::drop(); + break 'nonaborted_main_loop; + } } let mut is_finished = false; @@ -759,10 +927,13 @@ impl, TH: TaskHandler> ThreadManager { // to measure _actual_ cpu usage easily with the select approach. select_biased! { recv(finished_blocked_task_receiver) -> executed_task => { - let executed_task = executed_task.unwrap(); - + let Some(executed_task) = Self::accumulate_result_with_timings( + &mut result_with_timings, + executed_task.expect("alive handler") + ) else { + break 'nonaborted_main_loop; + }; state_machine.deschedule_task(&executed_task.task); - Self::accumulate_result_with_timings(&mut result_with_timings, executed_task); }, recv(dummy_unblocked_task_receiver) -> dummy => { assert_matches!(dummy, Err(RecvError)); @@ -775,25 +946,35 @@ impl, TH: TaskHandler> ThreadManager { recv(new_task_receiver) -> message => { assert!(!session_ending); - match message.unwrap() { - NewTaskPayload::Payload(task) => { + match message { + Ok(NewTaskPayload::Payload(task)) => { + sleepless_testing::at(CheckPoint::NewTask(task.task_index())); if let Some(task) = state_machine.schedule_task(task) { runnable_task_sender.send_aux_payload(task).unwrap(); } } - NewTaskPayload::CloseSubchannel => { + Ok(NewTaskPayload::CloseSubchannel) => { session_ending = true; } - NewTaskPayload::OpenSubchannel(_context) => { + Ok(NewTaskPayload::OpenSubchannel(_context)) => { unreachable!(); } + Err(RecvError) => { + // Mostly likely is that this scheduler is dropped for pruned blocks of + // abandoned forks... + // This short-circuiting is tested with test_scheduler_drop_short_circuiting. + break 'nonaborted_main_loop; + } } }, recv(finished_idle_task_receiver) -> executed_task => { - let executed_task = executed_task.unwrap(); - + let Some(executed_task) = Self::accumulate_result_with_timings( + &mut result_with_timings, + executed_task.expect("alive handler") + ) else { + break 'nonaborted_main_loop; + }; state_machine.deschedule_task(&executed_task.task); - Self::accumulate_result_with_timings(&mut result_with_timings, executed_task); }, }; @@ -807,10 +988,31 @@ impl, TH: TaskHandler> ThreadManager { &mut result_with_timings, initialized_result_with_timings(), )) - .unwrap(); + .expect("always outlived receiver"); session_ending = false; } } + + // There are several code-path reaching here out of the preceding unconditional + // `loop { ... }` by the use of `break 'nonaborted_main_loop;`. This scheduler + // thread will now initiate the termination process, indicating an abnormal abortion, + // in order to be handled gracefully by other threads. + + // Firstly, send result_with_timings as-is, because it's expected for us to put the + // last result_with_timings into the channel without exception. Usually, + // result_with_timings will contain the Err variant at this point, indicating the + // occurrence of transaction error. + session_result_sender + .send(result_with_timings) + .expect("always outlived receiver"); + + // Next, drop `new_task_receiver`. After that, the paired singleton + // `new_task_sender` will start to error when called by external threads, resulting + // in propagation of thread abortion to the external threads. + drop(new_task_receiver); + + // We will now exit this thread finally... Good bye. + sleepless_testing::at(CheckPoint::SchedulerThreadAborted); } }; @@ -823,7 +1025,10 @@ impl, TH: TaskHandler> ThreadManager { move || loop { let (task, sender) = select_biased! { recv(runnable_task_receiver.for_select()) -> message => { - if let Some(task) = runnable_task_receiver.after_select(message.unwrap()) { + let Ok(message) = message else { + break; + }; + if let Some(task) = runnable_task_receiver.after_select(message) { (task, &finished_blocked_task_sender) } else { continue; @@ -833,6 +1038,7 @@ impl, TH: TaskHandler> ThreadManager { if let Ok(task) = task { (task, &finished_idle_task_sender) } else { + runnable_task_receiver.never_receive_from_aux(); continue; } }, @@ -843,14 +1049,17 @@ impl, TH: TaskHandler> ThreadManager { &mut task, &pool.handler_context, ); - sender.send(task).unwrap(); + if sender.send(task).is_err() { + warn!("handler_thread: scheduler thread aborted..."); + break; + } } }; self.scheduler_thread = Some( thread::Builder::new() .name("solScheduler".to_owned()) - .spawn(scheduler_main_loop()) + .spawn_tracked(scheduler_main_loop()) .unwrap(), ); @@ -859,40 +1068,100 @@ impl, TH: TaskHandler> ThreadManager { |thx| { thread::Builder::new() .name(format!("solScHandler{:02}", thx)) - .spawn(handler_main_loop()) + .spawn_tracked(handler_main_loop()) .unwrap() } }) .collect(); } - fn send_task(&self, task: Task) { + fn send_task(&self, task: Task) -> ScheduleResult { debug!("send_task()"); self.new_task_sender .send(NewTaskPayload::Payload(task)) + .map_err(|_| SchedulerAborted) + } + + fn ensure_join_threads(&mut self, should_receive_session_result: bool) { + trace!("ensure_join_threads() is called"); + + if let Some(scheduler_thread) = self.scheduler_thread.take() { + for thread in self.handler_threads.drain(..) { + debug!("joining...: {:?}", thread); + () = thread.join().unwrap(); + } + () = scheduler_thread.join().unwrap(); + + if should_receive_session_result { + let result_with_timings = self.session_result_receiver.recv().unwrap(); + debug!("ensure_join_threads(): err: {:?}", result_with_timings.0); + self.put_session_result_with_timings(result_with_timings); + } + } else { + warn!("ensure_join_threads(): skipping; already joined..."); + }; + } + + fn ensure_join_threads_after_abort( + &mut self, + should_receive_aborted_session_result: bool, + ) -> TransactionError { + self.ensure_join_threads(should_receive_aborted_session_result); + self.session_result_with_timings + .as_mut() .unwrap() + .0 + .clone() + .unwrap_err() + } + + fn are_threads_joined(&self) -> bool { + if self.scheduler_thread.is_none() { + // Emptying handler_threads must be an atomic operation with scheduler_thread being + // taken. + assert!(self.handler_threads.is_empty()); + true + } else { + false + } } fn end_session(&mut self) { - if self.session_result_with_timings.is_some() { - debug!("end_session(): already result resides within thread manager.."); + if self.are_threads_joined() { + assert!(self.session_result_with_timings.is_some()); + debug!("end_session(): skipping; already joined the aborted threads.."); + return; + } else if self.session_result_with_timings.is_some() { + debug!("end_session(): skipping; already result resides within thread manager.."); return; } debug!("end_session(): will end session..."); - self.new_task_sender + let mut abort_detected = self + .new_task_sender .send(NewTaskPayload::CloseSubchannel) - .unwrap(); + .is_err(); + if abort_detected { + self.ensure_join_threads_after_abort(true); + return; + } + + // Even if abort is detected, it's guaranteed that the scheduler thread puts the last + // message into the session_result_sender before terminating. let result_with_timings = self.session_result_receiver.recv().unwrap(); + abort_detected = result_with_timings.0.is_err(); self.put_session_result_with_timings(result_with_timings); + if abort_detected { + self.ensure_join_threads_after_abort(false); + } } fn start_session(&mut self, context: &SchedulingContext) { assert_matches!(self.session_result_with_timings, None); self.new_task_sender .send(NewTaskPayload::OpenSubchannel(context.clone())) - .unwrap(); + .expect("no new session after aborted"); } } @@ -937,18 +1206,27 @@ impl SpawnableScheduler for PooledScheduler { impl InstalledScheduler for PooledScheduler { fn id(&self) -> SchedulerId { - self.inner.thread_manager.scheduler_id + self.inner.id() } fn context(&self) -> &SchedulingContext { &self.context } - fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { + fn schedule_execution( + &self, + &(transaction, index): &(&SanitizedTransaction, usize), + ) -> ScheduleResult { let task = SchedulingStateMachine::create_task(transaction.clone(), index, &mut |pubkey| { self.inner.usage_queue_loader.load(pubkey) }); - self.inner.thread_manager.send_task(task); + self.inner.thread_manager.send_task(task) + } + + fn recover_error_after_abort(&mut self) -> TransactionError { + self.inner + .thread_manager + .ensure_join_threads_after_abort(true) } fn wait_for_termination( @@ -970,7 +1248,16 @@ where TH: TaskHandler, { fn return_to_pool(self: Box) { - self.thread_manager.pool.clone().return_scheduler(*self) + // Refer to the comment in is_trashed() as to the exact definition of the concept of + // _trashed_ and the interaction among different parts of unified scheduler. + let should_trash = self.is_trashed(); + if should_trash { + info!("trashing scheduler (id: {})...", self.id()); + } + self.thread_manager + .pool + .clone() + .return_scheduler(*self, should_trash); } } @@ -978,6 +1265,7 @@ where mod tests { use { super::*, + crate::sleepless_testing, assert_matches::assert_matches, solana_runtime::{ bank::Bank, @@ -996,6 +1284,15 @@ mod tests { std::{sync::Arc, thread::JoinHandle}, }; + #[derive(Debug)] + enum TestCheckPoint { + BeforeNewTask, + AfterTaskHandled, + AfterSchedulerThreadAborted, + AfterTrashedSchedulerCleaned, + BeforeThreadManagerDrop, + } + #[test] fn test_scheduler_pool_new() { solana_logger::setup(); @@ -1006,7 +1303,8 @@ mod tests { // this indirectly proves that there should be circular link because there's only one Arc // at this moment now - assert_eq!((Arc::strong_count(&pool), Arc::weak_count(&pool)), (1, 1)); + // the 2 weaks are for the weak_self field and the pool cleaner thread. + assert_eq!((Arc::strong_count(&pool), Arc::weak_count(&pool)), (1, 2)); let debug = format!("{pool:#?}"); assert!(!debug.is_empty()); } @@ -1026,6 +1324,177 @@ mod tests { assert!(!debug.is_empty()); } + const SHORTENED_POOL_CLEANER_INTERVAL: Duration = Duration::from_millis(1); + + enum AbortCase { + Unhandled, + UnhandledWhilePanicking, + Handled, + } + + fn do_test_scheduler_drop_abort(abort_case: AbortCase) { + solana_logger::setup(); + + let _progress = sleepless_testing::setup(match abort_case { + AbortCase::Unhandled => &[ + &CheckPoint::SchedulerThreadAborted, + &TestCheckPoint::AfterSchedulerThreadAborted, + ], + _ => &[], + }); + + #[derive(Debug)] + struct FaultyHandler; + impl TaskHandler for FaultyHandler { + fn handle( + result: &mut Result<()>, + _timings: &mut ExecuteTimings, + _bank: &Arc, + _transaction: &SanitizedTransaction, + _index: usize, + _handler_context: &HandlerContext, + ) { + *result = Err(TransactionError::AccountNotFound); + } + } + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + + let tx = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + + let bank = Bank::new_for_tests(&genesis_config); + let bank = setup_dummy_fork_graph(bank); + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = SchedulerPool::, _>::new( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); + let context = SchedulingContext::new(bank.clone()); + let scheduler = pool.do_take_scheduler(context); + scheduler.schedule_execution(&(tx, 0)).unwrap(); + + match abort_case { + AbortCase::Unhandled => { + sleepless_testing::at(TestCheckPoint::AfterSchedulerThreadAborted); + // Directly dropping PooledScheduler is illegal unless panicking already, especially + // after being aborted. It must be converted to PooledSchedulerInner via + // ::into_inner(); + drop::>(scheduler); + } + AbortCase::UnhandledWhilePanicking => { + // no sleepless_testing::at(); panicking special-casing isn't racy + panic!("ThreadManager::drop() should be skipped..."); + } + AbortCase::Handled => { + // no sleepless_testing::at(); ::into_inner() isn't racy + let ((result, _), mut scheduler_inner) = scheduler.into_inner(); + assert_matches!(result, Err(TransactionError::AccountNotFound)); + + // Calling ensure_join_threads() repeatedly should be safe. + let dummy_flag = true; // doesn't matter because it's skipped anyway + scheduler_inner + .thread_manager + .ensure_join_threads(dummy_flag); + + drop::>(scheduler_inner); + } + } + } + + #[test] + #[should_panic(expected = "does not match `Some((Ok(_), _))")] + fn test_scheduler_drop_abort_unhandled() { + do_test_scheduler_drop_abort(AbortCase::Unhandled); + } + + #[test] + #[should_panic(expected = "ThreadManager::drop() should be skipped...")] + fn test_scheduler_drop_abort_unhandled_while_panicking() { + do_test_scheduler_drop_abort(AbortCase::UnhandledWhilePanicking); + } + + #[test] + fn test_scheduler_drop_abort_handled() { + do_test_scheduler_drop_abort(AbortCase::Handled); + } + + #[test] + fn test_scheduler_drop_short_circuiting() { + solana_logger::setup(); + + let _progress = sleepless_testing::setup(&[ + &TestCheckPoint::BeforeThreadManagerDrop, + &CheckPoint::NewTask(0), + &CheckPoint::SchedulerThreadAborted, + &TestCheckPoint::AfterSchedulerThreadAborted, + ]); + + static TASK_COUNT: Mutex = Mutex::new(0); + + #[derive(Debug)] + struct CountingHandler; + impl TaskHandler for CountingHandler { + fn handle( + _result: &mut Result<()>, + _timings: &mut ExecuteTimings, + _bank: &Arc, + _transaction: &SanitizedTransaction, + _index: usize, + _handler_context: &HandlerContext, + ) { + *TASK_COUNT.lock().unwrap() += 1; + } + } + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + + let bank = Bank::new_for_tests(&genesis_config); + let bank = setup_dummy_fork_graph(bank); + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = SchedulerPool::, _>::new( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); + let context = SchedulingContext::new(bank.clone()); + let scheduler = pool.do_take_scheduler(context); + + for i in 0..10 { + let tx = + &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + scheduler.schedule_execution(&(tx, i)).unwrap(); + } + + // Make sure ThreadManager::drop() is properly short-circuiting for non-aborting scheduler. + sleepless_testing::at(TestCheckPoint::BeforeThreadManagerDrop); + drop::>(scheduler); + sleepless_testing::at(TestCheckPoint::AfterSchedulerThreadAborted); + assert!(*TASK_COUNT.lock().unwrap() < 10); + } + #[test] fn test_scheduler_pool_filo() { solana_logger::setup(); @@ -1044,10 +1513,10 @@ mod tests { let (result_with_timings, scheduler1) = scheduler1.into_inner(); assert_matches!(result_with_timings, (Ok(()), _)); - pool.return_scheduler(scheduler1); + pool.return_scheduler(scheduler1, false); let (result_with_timings, scheduler2) = scheduler2.into_inner(); assert_matches!(result_with_timings, (Ok(()), _)); - pool.return_scheduler(scheduler2); + pool.return_scheduler(scheduler2, false); let scheduler3 = pool.do_take_scheduler(context.clone()); assert_eq!(scheduler_id2, scheduler3.id()); @@ -1090,7 +1559,7 @@ mod tests { let scheduler = pool.do_take_scheduler(old_context.clone()); let scheduler_id = scheduler.id(); - pool.return_scheduler(scheduler.into_inner().1); + pool.return_scheduler(scheduler.into_inner().1, false); let scheduler = pool.take_scheduler(new_context.clone()); assert_eq!(scheduler_id, scheduler.id()); @@ -1175,16 +1644,24 @@ mod tests { assert_eq!(bank.transaction_count(), 0); let scheduler = pool.take_scheduler(context); - scheduler.schedule_execution(&(tx0, 0)); + scheduler.schedule_execution(&(tx0, 0)).unwrap(); let bank = BankWithScheduler::new(bank, Some(scheduler)); assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); assert_eq!(bank.transaction_count(), 1); } - #[test] - fn test_scheduler_schedule_execution_failure() { + fn do_test_scheduler_schedule_execution_failure(extra_tx_after_failure: bool) { solana_logger::setup(); + let _progress = sleepless_testing::setup(&[ + &CheckPoint::TaskHandled(0), + &TestCheckPoint::AfterTaskHandled, + &CheckPoint::SchedulerThreadAborted, + &TestCheckPoint::AfterSchedulerThreadAborted, + &CheckPoint::TrashedSchedulerCleaned(1), + &TestCheckPoint::AfterTrashedSchedulerCleaned, + ]); + let GenesisConfigInfo { genesis_config, mint_keypair, @@ -1194,10 +1671,17 @@ mod tests { let bank = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); + let pool_raw = DefaultSchedulerPool::do_new( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + SHORTENED_POOL_CLEANER_INTERVAL, + ); + let pool = pool_raw.clone(); let context = SchedulingContext::new(bank.clone()); - let mut scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context); let unfunded_keypair = Keypair::new(); let bad_tx = @@ -1208,9 +1692,8 @@ mod tests { genesis_config.hash(), )); assert_eq!(bank.transaction_count(), 0); - scheduler.schedule_execution(&(bad_tx, 0)); - // simulate the task-sending thread is stalled for some reason. - std::thread::sleep(std::time::Duration::from_secs(1)); + scheduler.schedule_execution(&(bad_tx, 0)).unwrap(); + sleepless_testing::at(TestCheckPoint::AfterTaskHandled); assert_eq!(bank.transaction_count(), 0); let good_tx_after_bad_tx = @@ -1226,25 +1709,115 @@ mod tests { .result, Ok(_) ); - scheduler.schedule_execution(&(good_tx_after_bad_tx, 1)); - scheduler.pause_for_recent_blockhash(); + sleepless_testing::at(TestCheckPoint::AfterSchedulerThreadAborted); + let bank = BankWithScheduler::new(bank, Some(scheduler)); + if extra_tx_after_failure { + assert_matches!( + bank.schedule_transaction_executions([(good_tx_after_bad_tx, &1)].into_iter()), + Err(TransactionError::AccountNotFound) + ); + } // transaction_count should remain same as scheduler should be bailing out. // That's because we're testing the serialized failing execution case in this test. - // However, currently threaded impl can't properly abort in this situation.. - // so, 1 should be observed, instead of 0. // Also note that bank.transaction_count() is generally racy by nature, because // blockstore_processor and unified_scheduler both tend to process non-conflicting batches // in parallel as part of the normal operation. - assert_eq!(bank.transaction_count(), 1); + assert_eq!(bank.transaction_count(), 0); - let bank = BankWithScheduler::new(bank, Some(scheduler)); + assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0); assert_matches!( bank.wait_for_completed_scheduler(), - Some(( - Err(solana_sdk::transaction::TransactionError::AccountNotFound), - _timings - )) + Some((Err(TransactionError::AccountNotFound), _timings)) + ); + assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 1); + sleepless_testing::at(TestCheckPoint::AfterTrashedSchedulerCleaned); + assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0); + } + + #[test] + fn test_scheduler_schedule_execution_failure_with_extra_tx() { + do_test_scheduler_schedule_execution_failure(true); + } + + #[test] + fn test_scheduler_schedule_execution_failure_without_extra_tx() { + do_test_scheduler_schedule_execution_failure(false); + } + + #[test] + fn test_scheduler_execution_failure_short_circuiting() { + solana_logger::setup(); + + let _progress = sleepless_testing::setup(&[ + &TestCheckPoint::BeforeNewTask, + &CheckPoint::NewTask(0), + &CheckPoint::TaskHandled(0), + &CheckPoint::SchedulerThreadAborted, + &TestCheckPoint::AfterSchedulerThreadAborted, + ]); + + static TASK_COUNT: Mutex = Mutex::new(0); + + #[derive(Debug)] + struct CountingFaultyHandler; + impl TaskHandler for CountingFaultyHandler { + fn handle( + result: &mut Result<()>, + _timings: &mut ExecuteTimings, + _bank: &Arc, + _transaction: &SanitizedTransaction, + index: usize, + _handler_context: &HandlerContext, + ) { + *TASK_COUNT.lock().unwrap() += 1; + if index == 1 { + *result = Err(TransactionError::AccountNotFound); + } + sleepless_testing::at(CheckPoint::TaskHandled(index)); + } + } + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + + let bank = Bank::new_for_tests(&genesis_config); + let bank = setup_dummy_fork_graph(bank); + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = SchedulerPool::, _>::new( + None, + None, + None, + None, + ignored_prioritization_fee_cache, ); + let context = SchedulingContext::new(bank.clone()); + let scheduler = pool.do_take_scheduler(context); + + for i in 0..10 { + let tx = + &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + scheduler.schedule_execution(&(tx, i)).unwrap(); + } + // finally unblock the scheduler thread; otherwise the above schedule_execution could + // return SchedulerAborted... + sleepless_testing::at(TestCheckPoint::BeforeNewTask); + + // Make sure bank.wait_for_completed_scheduler() is properly short-circuiting for aborting scheduler. + let bank = BankWithScheduler::new(bank, Some(Box::new(scheduler))); + assert_matches!( + bank.wait_for_completed_scheduler(), + Some((Err(TransactionError::AccountNotFound), _timings)) + ); + sleepless_testing::at(TestCheckPoint::AfterSchedulerThreadAborted); + assert!(*TASK_COUNT.lock().unwrap() < 10); } #[test] @@ -1319,8 +1892,12 @@ mod tests { // Stall handling tx0 and tx1 let lock_to_stall = LOCK_TO_STALL.lock().unwrap(); - scheduler.schedule_execution(&(tx0, STALLED_TRANSACTION_INDEX)); - scheduler.schedule_execution(&(tx1, BLOCKED_TRANSACTION_INDEX)); + scheduler + .schedule_execution(&(tx0, STALLED_TRANSACTION_INDEX)) + .unwrap(); + scheduler + .schedule_execution(&(tx1, BLOCKED_TRANSACTION_INDEX)) + .unwrap(); // Wait a bit for the scheduler thread to decide to block tx1 std::thread::sleep(std::time::Duration::from_secs(1)); @@ -1394,7 +1971,7 @@ mod tests { .take(10000) { let scheduler = pool.take_scheduler(context.clone()); - scheduler.schedule_execution(&(dummy_tx, index)); + scheduler.schedule_execution(&(dummy_tx, index)).unwrap(); scheduler.wait_for_termination(false).1.return_to_pool(); } } @@ -1434,7 +2011,10 @@ mod tests { &self.2 } - fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { + fn schedule_execution( + &self, + &(transaction, index): &(&SanitizedTransaction, usize), + ) -> ScheduleResult { let transaction_and_index = (transaction.clone(), index); let context = self.context().clone(); let pool = self.3.clone(); @@ -1457,6 +2037,12 @@ mod tests { ); (result, timings) })); + + Ok(()) + } + + fn recover_error_after_abort(&mut self) -> TransactionError { + unimplemented!(); } fn wait_for_termination( @@ -1485,7 +2071,7 @@ mod tests { for AsyncScheduler { fn return_to_pool(self: Box) { - self.3.clone().return_scheduler(*self) + self.3.clone().return_scheduler(*self, false) } } @@ -1562,7 +2148,8 @@ mod tests { assert_eq!(bank.transaction_count(), 0); // schedule but not immediately execute transaction - bank.schedule_transaction_executions([(&very_old_valid_tx, &0)].into_iter()); + bank.schedule_transaction_executions([(&very_old_valid_tx, &0)].into_iter()) + .unwrap(); // this calls register_recent_blockhash internally bank.fill_bank_with_ticks_for_tests(); diff --git a/unified-scheduler-pool/src/sleepless_testing.rs b/unified-scheduler-pool/src/sleepless_testing.rs new file mode 100644 index 00000000000000..9c2213f657e86a --- /dev/null +++ b/unified-scheduler-pool/src/sleepless_testing.rs @@ -0,0 +1,352 @@ +use std::{ + io, + thread::{JoinHandle, Scope, ScopedJoinHandle}, +}; + +#[allow(dead_code)] +pub(crate) trait ScopeTracked<'scope>: Sized { + fn spawn_tracked( + &'scope self, + f: impl FnOnce() -> T + Send + 'scope, + ) -> ScopedJoinHandle<'scope, T>; +} + +pub(crate) trait BuilderTracked: Sized { + fn spawn_tracked( + self, + f: impl FnOnce() -> T + Send + 'static, + ) -> io::Result>; + + #[allow(dead_code)] + fn spawn_scoped_tracked<'scope, 'env, T: Send + 'scope>( + self, + scope: &'scope Scope<'scope, 'env>, + f: impl FnOnce() -> T + Send + 'scope, + ) -> io::Result>; +} + +#[cfg(not(test))] +pub(crate) use sleepless_testing_dummy::*; +#[cfg(test)] +pub(crate) use sleepless_testing_real::*; + +#[cfg(test)] +mod sleepless_testing_real { + use { + lazy_static::lazy_static, + std::{ + cmp::Ordering::{Equal, Greater, Less}, + collections::{HashMap, HashSet}, + fmt::Debug, + sync::{Arc, Condvar, Mutex}, + thread::{current, JoinHandle, ThreadId}, + }, + }; + + #[derive(Debug)] + struct Progress { + _name: String, + check_points: Vec, + current_check_point: Mutex, + condvar: Condvar, + } + + #[derive(Debug)] + struct JustCreated; + + impl Progress { + fn new(check_points: impl Iterator, name: String) -> Self { + let initial_check_point = format!("{JustCreated:?}"); + let check_points = [initial_check_point.clone()] + .into_iter() + .chain(check_points) + .collect::>(); + let check_points_set = check_points.iter().collect::>(); + assert_eq!(check_points.len(), check_points_set.len()); + + Self { + _name: name, + check_points, + current_check_point: Mutex::new(initial_check_point), + condvar: Condvar::new(), + } + } + + fn change_current_check_point(&self, anchored_check_point: String) { + let Some(anchored_index) = self + .check_points + .iter() + .position(|check_point| check_point == &anchored_check_point) + else { + // Ignore unrecognizable checkpoints... + return; + }; + + let mut current_check_point = self.current_check_point.lock().unwrap(); + + let should_change = + match anchored_index.cmp(&self.expected_next_index(¤t_check_point)) { + Equal => true, + Greater => { + // anchor is one of future check points; block the current thread until + // that happens + current_check_point = self + .condvar + .wait_while(current_check_point, |current_check_point| { + anchored_index != self.expected_next_index(current_check_point) + }) + .unwrap(); + true + } + // anchor is already observed. + Less => false, + }; + + if should_change { + *current_check_point = anchored_check_point; + self.condvar.notify_all(); + } + } + + fn expected_next_index(&self, current_check_point: &String) -> usize { + let current_index = self + .check_points + .iter() + .position(|check_point| check_point == current_check_point) + .unwrap(); + current_index.checked_add(1).unwrap() + } + } + + lazy_static! { + static ref THREAD_REGISTRY: Mutex>> = + Mutex::new(HashMap::new()); + } + + #[must_use] + pub(crate) struct ActiveProgress(Arc, ThreadId); + + impl ActiveProgress { + fn new(progress: Arc) -> Self { + let active_progress = Self(progress, current().id()); + active_progress.activate(); + active_progress + } + + fn activate(&self) { + assert!(THREAD_REGISTRY + .lock() + .unwrap() + .insert(self.1, self.0.clone()) + .is_none()); + } + + fn deactivate(&self) { + assert_eq!( + *self.0.check_points.last().unwrap(), + *self.0.current_check_point.lock().unwrap(), + "unfinished progress" + ); + THREAD_REGISTRY.lock().unwrap().remove(&self.1).unwrap(); + } + } + + impl Drop for ActiveProgress { + fn drop(&mut self) { + self.deactivate(); + } + } + + /// Enable sleepless_testing with given check points being monitored, until the returned value + /// is dropped. This guarantees the check points are linearized in the exact order as + /// specified, among all of tracked threads. + pub(crate) fn setup(check_points: &[&dyn Debug]) -> ActiveProgress { + let progress = Arc::new(Progress::new( + check_points + .iter() + .map(|check_point| format!("{check_point:?}")), + current().name().unwrap_or_default().to_string(), + )); + ActiveProgress::new(progress) + } + + /// Signal about the passage of the given check point. If sleepless_testing is enabled with the + /// check point monitored, this may block the current thread, not to violate the enforced order + /// of monitored check points. + pub(crate) fn at(check_point: T) { + let mut registry = THREAD_REGISTRY.lock().unwrap(); + if let Some(progress) = registry.get_mut(¤t().id()).cloned() { + drop(registry); + progress.change_current_check_point(format!("{check_point:?}")); + } else if current().name().unwrap_or_default().starts_with("test_") { + panic!("seems setup() isn't called yet?"); + } + } + + pub(crate) mod thread { + pub(crate) use crate::sleepless_testing::{BuilderTracked, ScopeTracked}; + use { + super::*, + std::{ + io, + thread::{current, spawn, Builder, Scope, ScopedJoinHandle}, + }, + }; + + struct SpawningThreadTracker(Arc<(Mutex, Condvar)>); + struct SpawnedThreadTracker(Arc<(Mutex, Condvar)>, ThreadId, bool); + + impl SpawningThreadTracker { + fn ensure_spawned_tracked(self) { + let (lock, cvar) = &*self.0; + let lock = lock.lock().unwrap(); + assert!(cvar.wait_while(lock, |&mut tracked| !tracked).is_ok()); + } + } + + impl SpawnedThreadTracker { + fn do_track(&mut self) { + self.2 = { + let mut registry = THREAD_REGISTRY.lock().unwrap(); + if let Some(progress) = registry.get(&self.1).cloned() { + assert!(registry.insert(current().id(), progress).is_none()); + true + } else { + false + } + }; + let (lock, cvar) = &*self.0; + *lock.lock().unwrap() = true; + cvar.notify_one(); + } + + fn do_untrack(self) { + if self.2 { + let mut registry = THREAD_REGISTRY.lock().unwrap(); + registry.remove(¤t().id()).unwrap(); + } + } + + fn with_tracked(mut self, f: impl FnOnce() -> T + Send) -> T { + self.do_track(); + let returned = f(); + self.do_untrack(); + returned + } + } + + fn prepare_tracking() -> (SpawningThreadTracker, SpawnedThreadTracker) { + let lock_and_condvar1 = Arc::new((Mutex::new(false), Condvar::new())); + let lock_and_condvar2 = lock_and_condvar1.clone(); + let spawning_thread_tracker = SpawningThreadTracker(lock_and_condvar1); + let spawning_thread_id = current().id(); + let spawned_thread_tracker = + SpawnedThreadTracker(lock_and_condvar2, spawning_thread_id, false); + (spawning_thread_tracker, spawned_thread_tracker) + } + + #[allow(dead_code)] + pub(crate) fn spawn_tracked( + f: impl FnOnce() -> T + Send + 'static, + ) -> JoinHandle { + let (spawning_thread_tracker, spawned_thread_tracker) = prepare_tracking(); + let spawned_thread = spawn(move || spawned_thread_tracker.with_tracked(f)); + spawning_thread_tracker.ensure_spawned_tracked(); + spawned_thread + } + + impl<'scope, 'env> ScopeTracked<'scope> for Scope<'scope, 'env> { + fn spawn_tracked( + &'scope self, + f: impl FnOnce() -> T + Send + 'scope, + ) -> ScopedJoinHandle<'scope, T> { + let (spawning_thread_tracker, spawned_thread_tracker) = prepare_tracking(); + let spawned_thread = self.spawn(move || spawned_thread_tracker.with_tracked(f)); + spawning_thread_tracker.ensure_spawned_tracked(); + spawned_thread + } + } + + impl BuilderTracked for Builder { + fn spawn_tracked( + self, + f: impl FnOnce() -> T + Send + 'static, + ) -> io::Result> { + let (spawning_thread_tracker, spawned_thread_tracker) = prepare_tracking(); + let spawned_thread_result = + self.spawn(move || spawned_thread_tracker.with_tracked(f)); + if spawned_thread_result.is_ok() { + spawning_thread_tracker.ensure_spawned_tracked(); + } + spawned_thread_result + } + + fn spawn_scoped_tracked<'scope, 'env, T: Send + 'scope>( + self, + scope: &'scope Scope<'scope, 'env>, + f: impl FnOnce() -> T + Send + 'scope, + ) -> io::Result> { + let (spawning_thread_tracker, spawned_thread_tracker) = prepare_tracking(); + let spawned_thread_result = + self.spawn_scoped(scope, move || spawned_thread_tracker.with_tracked(f)); + if spawned_thread_result.is_ok() { + spawning_thread_tracker.ensure_spawned_tracked(); + } + spawned_thread_result + } + } + } +} + +#[cfg(not(test))] +mod sleepless_testing_dummy { + use std::fmt::Debug; + + #[inline] + pub(crate) fn at(_check_point: T) {} + + pub(crate) mod thread { + pub(crate) use crate::sleepless_testing::{BuilderTracked, ScopeTracked}; + use std::{ + io, + thread::{spawn, Builder, JoinHandle, Scope, ScopedJoinHandle}, + }; + + #[inline] + #[allow(dead_code)] + pub(crate) fn spawn_tracked( + f: impl FnOnce() -> T + Send + 'static, + ) -> JoinHandle { + spawn(f) + } + + impl<'scope, 'env> ScopeTracked<'scope> for Scope<'scope, 'env> { + #[inline] + fn spawn_tracked( + &'scope self, + f: impl FnOnce() -> T + Send + 'scope, + ) -> ScopedJoinHandle<'scope, T> { + self.spawn(f) + } + } + + impl BuilderTracked for Builder { + #[inline] + fn spawn_tracked( + self, + f: impl FnOnce() -> T + Send + 'static, + ) -> io::Result> { + self.spawn(f) + } + + #[inline] + fn spawn_scoped_tracked<'scope, 'env, T: Send + 'scope>( + self, + scope: &'scope Scope<'scope, 'env>, + f: impl FnOnce() -> T + Send + 'scope, + ) -> io::Result> { + self.spawn_scoped(scope, f) + } + } + } +}