Skip to content

Commit

Permalink
Make unified schedler abort on tx execution errors (#1211)
Browse files Browse the repository at this point in the history
* Make unified schedler abort on tx execution errors

* Fix typo and improve wording

* Indicate panic _should_ be enforced by trait impls

* Fix typo

* Avoid closure of closure...

* Rename: is_threads_joined => are_threads_joined

* Add some comments for trashed_scheduler_inners

* Ensure 100% coverage of end_session by more tests

* Document relation of aborted thread and is_trashed

* Replace sleep()s with sleepless_testing

* Fix lints from newer rust
  • Loading branch information
ryoqun authored May 31, 2024
1 parent efc7291 commit 74dcd3b
Show file tree
Hide file tree
Showing 7 changed files with 1,181 additions and 108 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion core/tests/unified_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ fn test_scheduler_waited_by_drop_bank_service() {
// Delay transaction execution to ensure transaction execution happens after termintion has
// been started
let lock_to_stall = LOCK_TO_STALL.lock().unwrap();
pruned_bank.schedule_transaction_executions([(&tx, &0)].into_iter());
pruned_bank
.schedule_transaction_executions([(&tx, &0)].into_iter())
.unwrap();
drop(pruned_bank);
assert_eq!(pool_raw.pooled_scheduler_count(), 0);
drop(lock_to_stall);
Expand Down
82 changes: 64 additions & 18 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,29 @@ fn process_batches(
"process_batches()/schedule_batches_for_execution({} batches)",
batches.len()
);
// scheduling always succeeds here without being blocked on actual transaction executions.
// The transaction execution errors will be collected via the blocking fn called
// BankWithScheduler::wait_for_completed_scheduler(), if any.
schedule_batches_for_execution(bank, batches);
Ok(())
// Scheduling usually succeeds (immediately returns `Ok(())`) here without being blocked on
// the actual transaction executions.
//
// As an exception, this code path could propagate the transaction execution _errors of
// previously-scheduled transactions_ to notify the replay stage. Then, the replay stage
// will bail out the further processing of the malformed (possibly malicious) block
// immediately, not to waste any system resources. Note that this propagation is of early
// hints. Even if errors won't be propagated in this way, they are guaranteed to be
// propagated eventually via the blocking fn called
// BankWithScheduler::wait_for_completed_scheduler().
//
// To recite, the returned error is completely unrelated to the argument's `batches` at the
// hand. While being awkward, the _async_ unified scheduler is abusing this existing error
// propagation code path to the replay stage for compatibility and ease of integration,
// exploiting the fact that the replay stage doesn't care _which transaction the returned
// error is originating from_.
//
// In the future, more proper error propagation mechanism will be introduced once after we
// fully transition to the unified scheduler for the block verification. That one would be
// a push based one from the unified scheduler to the replay stage to eliminate the current
// overhead: 1 read lock per batch in
// `BankWithScheduler::schedule_transaction_executions()`.
schedule_batches_for_execution(bank, batches)
} else {
debug!(
"process_batches()/rebatch_and_execute_batches({} batches)",
Expand All @@ -364,7 +382,7 @@ fn process_batches(
fn schedule_batches_for_execution(
bank: &BankWithScheduler,
batches: &[TransactionBatchWithIndexes],
) {
) -> Result<()> {
for TransactionBatchWithIndexes {
batch,
transaction_indexes,
Expand All @@ -375,8 +393,9 @@ fn schedule_batches_for_execution(
.sanitized_transactions()
.iter()
.zip(transaction_indexes.iter()),
);
)?;
}
Ok(())
}

fn rebatch_transactions<'a>(
Expand Down Expand Up @@ -2138,7 +2157,8 @@ pub mod tests {
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
},
installed_scheduler_pool::{
MockInstalledScheduler, MockUninstalledScheduler, SchedulingContext,
MockInstalledScheduler, MockUninstalledScheduler, SchedulerAborted,
SchedulingContext,
},
},
solana_sdk::{
Expand Down Expand Up @@ -4740,8 +4760,7 @@ pub mod tests {
assert_eq!(batch3.transaction_indexes, vec![43, 44]);
}

#[test]
fn test_schedule_batches_for_execution() {
fn do_test_schedule_batches_for_execution(should_succeed: bool) {
solana_logger::setup();
let dummy_leader_pubkey = solana_sdk::pubkey::new_rand();
let GenesisConfigInfo {
Expand All @@ -4762,10 +4781,23 @@ pub mod tests {
.times(1)
.in_sequence(&mut seq.lock().unwrap())
.return_const(context);
mocked_scheduler
.expect_schedule_execution()
.times(txs.len())
.returning(|_| ());
if should_succeed {
mocked_scheduler
.expect_schedule_execution()
.times(txs.len())
.returning(|(_, _)| Ok(()));
} else {
// mocked_scheduler isn't async; so short-circuiting behavior is quite visible in that
// .times(1) is called instead of .times(txs.len()), not like the succeeding case
mocked_scheduler
.expect_schedule_execution()
.times(1)
.returning(|(_, _)| Err(SchedulerAborted));
mocked_scheduler
.expect_recover_error_after_abort()
.times(1)
.returning(|| TransactionError::InsufficientFundsForFee);
}
mocked_scheduler
.expect_wait_for_termination()
.with(mockall::predicate::eq(true))
Expand Down Expand Up @@ -4794,17 +4826,31 @@ pub mod tests {
let replay_tx_thread_pool = create_thread_pool(1);
let mut batch_execution_timing = BatchExecutionTiming::default();
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
assert!(process_batches(
let result = process_batches(
&bank,
&replay_tx_thread_pool,
&[batch_with_indexes],
None,
None,
&mut batch_execution_timing,
None,
&ignored_prioritization_fee_cache
)
.is_ok());
&ignored_prioritization_fee_cache,
);
if should_succeed {
assert_matches!(result, Ok(()));
} else {
assert_matches!(result, Err(TransactionError::InsufficientFundsForFee));
}
}

#[test]
fn test_schedule_batches_for_execution_success() {
do_test_schedule_batches_for_execution(true);
}

#[test]
fn test_schedule_batches_for_execution_failure() {
do_test_schedule_batches_for_execution(false);
}

#[test]
Expand Down
110 changes: 97 additions & 13 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use {
solana_sdk::{
hash::Hash,
slot_history::Slot,
transaction::{Result, SanitizedTransaction},
transaction::{Result, SanitizedTransaction, TransactionError},
},
std::{
fmt::Debug,
Expand All @@ -42,6 +42,10 @@ pub trait InstalledSchedulerPool: Send + Sync + Debug {
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox;
}

#[derive(Debug)]
pub struct SchedulerAborted;
pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Schedules, executes, and commits transactions under encapsulated implementation
///
Expand Down Expand Up @@ -101,17 +105,51 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static {
fn id(&self) -> SchedulerId;
fn context(&self) -> &SchedulingContext;

// Calling this is illegal as soon as wait_for_termination is called.
/// Schedule transaction for execution.
///
/// This non-blocking function will return immediately without waiting for actual execution.
///
/// Calling this is illegal as soon as `wait_for_termination()` is called. It would result in
/// fatal logic error.
///
/// Note that the returned result indicates whether the scheduler has been aborted due to a
/// previously-scheduled bad transaction, which terminates further block verification. So,
/// almost always, the returned error isn't due to the merely scheduling of the current
/// transaction itself. At this point, calling this does nothing anymore while it's still safe
/// to do. As soon as notified, callers are expected to stop processing upcoming transactions
/// of the same `SchedulingContext` (i.e. same block). Internally, the aborted scheduler will
/// be disposed cleanly, not repooled, after `wait_for_termination()` is called like
/// not-aborted schedulers.
///
/// Caller can acquire the error by calling a separate function called
/// `recover_error_after_abort()`, which requires `&mut self`, instead of `&self`. This
/// separation and the convoluted returned value semantics explained above are intentional to
/// optimize the fast code-path of normal transaction scheduling to be multi-threaded at the
/// cost of far slower error code-path while giving implementors increased flexibility by
/// having &mut.
fn schedule_execution<'a>(
&'a self,
transaction_with_index: &'a (&'a SanitizedTransaction, usize),
);
) -> ScheduleResult;

/// Return the error which caused the scheduler to abort.
///
/// Note that this must not be called until it's observed that `schedule_execution()` has
/// returned `Err(SchedulerAborted)`. Violating this should `panic!()`.
///
/// That said, calling this multiple times is completely acceptable after the error observation
/// from `schedule_execution()`. While it's not guaranteed, the same `.clone()`-ed errors of
/// the first bad transaction are usually returned across invocations.
fn recover_error_after_abort(&mut self) -> TransactionError;

/// Wait for a scheduler to terminate after processing.
///
/// This function blocks the current thread while waiting for the scheduler to complete all of
/// the executions for the scheduled transactions and to return the finalized
/// `ResultWithTimings`. Along with the result, this function also makes the scheduler itself
/// `ResultWithTimings`. This function still blocks for short period of time even in the case
/// of aborted schedulers to gracefully shutdown the scheduler (like thread joining).
///
/// Along with the result being returned, this function also makes the scheduler itself
/// uninstalled from the bank by transforming the consumed self.
///
/// If no transaction is scheduled, the result and timing will be `Ok(())` and
Expand Down Expand Up @@ -286,11 +324,15 @@ impl BankWithScheduler {
self.inner.scheduler.read().unwrap().is_some()
}

/// Schedule the transaction as long as the scheduler hasn't been aborted.
///
/// If the scheduler has been aborted, this doesn't schedule the transaction, instead just
/// return the error of prior scheduled transaction.
// 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet...
pub fn schedule_transaction_executions<'a>(
&self,
transactions_with_indexes: impl ExactSizeIterator<Item = (&'a SanitizedTransaction, &'a usize)>,
) {
) -> Result<()> {
trace!(
"schedule_transaction_executions(): {} txs",
transactions_with_indexes.len()
Expand All @@ -300,8 +342,25 @@ impl BankWithScheduler {
let scheduler = scheduler_guard.as_ref().unwrap();

for (sanitized_transaction, &index) in transactions_with_indexes {
scheduler.schedule_execution(&(sanitized_transaction, index));
if scheduler
.schedule_execution(&(sanitized_transaction, index))
.is_err()
{
drop(scheduler_guard);
// This write lock isn't atomic with the above the read lock. So, another thread
// could have called .recover_error_after_abort() while we're literally stuck at
// the gaps of these locks (i.e. this comment in source code wise) under extreme
// race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
// consideration in mind.
//
// Lastly, this non-atomic nature is intentional for optimizing the fast code-path
let mut scheduler_guard = self.inner.scheduler.write().unwrap();
let scheduler = scheduler_guard.as_mut().unwrap();
return Err(scheduler.recover_error_after_abort());
}
}

Ok(())
}

// take needless &mut only to communicate its semantic mutability to humans...
Expand Down Expand Up @@ -550,8 +609,7 @@ mod tests {
assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
}

#[test]
fn test_schedule_executions() {
fn do_test_schedule_execution(should_succeed: bool) {
solana_logger::setup();

let GenesisConfigInfo {
Expand All @@ -570,14 +628,40 @@ mod tests {
bank.clone(),
[true].into_iter(),
Some(|mocked: &mut MockInstalledScheduler| {
mocked
.expect_schedule_execution()
.times(1)
.returning(|(_, _)| ());
if should_succeed {
mocked
.expect_schedule_execution()
.times(1)
.returning(|(_, _)| Ok(()));
} else {
mocked
.expect_schedule_execution()
.times(1)
.returning(|(_, _)| Err(SchedulerAborted));
mocked
.expect_recover_error_after_abort()
.times(1)
.returning(|| TransactionError::InsufficientFundsForFee);
}
}),
);

let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
bank.schedule_transaction_executions([(&tx0, &0)].into_iter());
let result = bank.schedule_transaction_executions([(&tx0, &0)].into_iter());
if should_succeed {
assert_matches!(result, Ok(()));
} else {
assert_matches!(result, Err(TransactionError::InsufficientFundsForFee));
}
}

#[test]
fn test_schedule_execution_success() {
do_test_schedule_execution(true);
}

#[test]
fn test_schedule_execution_failure() {
do_test_schedule_execution(false);
}
}
1 change: 1 addition & 0 deletions unified-scheduler-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ solana-vote = { workspace = true }

[dev-dependencies]
assert_matches = { workspace = true }
lazy_static = { workspace = true }
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }

Expand Down
Loading

0 comments on commit 74dcd3b

Please sign in to comment.