Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return back stale out-of-pool scheduler by timeout #1690

Merged
merged 5 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 68 additions & 17 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,15 @@ impl WaitReason {
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum SchedulerStatus {
/// Unified scheduler is disabled or installed scheduler is consumed by wait_for_termination().
/// Note that transition to Unavailable from {Active, Stale} is one-way (i.e. one-time).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem strictly true, because it seems to also be used as an intermediate state when transitioning between active <-> stale?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, nice you're spotting this. I've omitted for simplicity here. I will do a follow-up for completeness.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be addressed by: #1797

Unavailable,
/// Scheduler is installed into a bank; could be running or just be idling.
/// This will be transitioned to Stale after certain time has passed if its bank hasn't frozen.
Active(InstalledSchedulerBox),
/// Scheduler is idling for long time, returning scheduler back to the pool.
/// This will be immediately (i.e. transaparently) transitioned to Active as soon as there's
/// new transaction to be executed.
Stale(InstalledSchedulerPoolArc, ResultWithTimings),
}

Expand All @@ -308,38 +315,45 @@ impl SchedulerStatus {
f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
) {
let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
panic!();
panic!("transition to Active failed: {self:?}");
};
*self = Self::Active(f(pool, result_with_timings));
}

pub(crate) fn maybe_transition_from_active_to_stale(
fn maybe_transition_from_active_to_stale(
&mut self,
f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
) {
if !matches!(self, Self::Active(_scheduler)) {
return;
}
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
panic!();
unreachable!("not active: {:?}", self);
};
let (pool, result_with_timings) = f(scheduler);
*self = Self::Stale(pool, result_with_timings);
}

pub(crate) fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
panic!();
panic!("transition to Unavailable failed: {self:?}");
};
scheduler
}

pub(crate) fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
panic!();
panic!("transition to Unavailable failed: {self:?}");
};
result_with_timings
}

fn active_scheduler(&self) -> &InstalledSchedulerBox {
let SchedulerStatus::Active(active_scheduler) = self else {
panic!("not active: {self:?}");
};
active_scheduler
}
}

/// Very thin wrapper around Arc<Bank>
Expand Down Expand Up @@ -423,6 +437,9 @@ impl BankWithScheduler {
///
/// If the scheduler has been aborted, this doesn't schedule the transaction, instead just
/// return the error of prior scheduled transaction.
///
/// Calling this will panic if the installed scheduler is Unavailable (the bank is
/// wait_for_termination()-ed or the unified scheduler is disabled in the first place).
// 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet...
pub fn schedule_transaction_executions<'a>(
&self,
Expand Down Expand Up @@ -454,6 +471,7 @@ impl BankWithScheduler {
Ok(())
}

#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
self.inner.do_create_timeout_listener()
}
Expand Down Expand Up @@ -502,28 +520,44 @@ impl BankWithSchedulerInner {
f(scheduler)
}
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
trace!(
"with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
self.bank.slot(),
);
Err(SchedulerAborted)
}
SchedulerStatus::Stale(_pool, _result_with_timings) => {
SchedulerStatus::Stale(pool, _result_with_timings) => {
let pool = pool.clone();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to clone the pool?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because this pool is borrowed and we need to re-lock scheduler with the write access:

error[E0505]: cannot move out of `scheduler` because it is borrowed
   --> runtime/src/installed_scheduler_pool.rs:532:22
    |
517 |         let scheduler = self.scheduler.read().unwrap();
    |             --------- binding `scheduler` declared here
518 |         match &*scheduler {
    |                 --------- borrow of `scheduler` occurs here
...
532 |                 drop(scheduler);
    |                      ^^^^^^^^^ move out of `scheduler` occurs here
...
552 |                 pool.register_timeout_listener(self.do_create_timeout_listener());
    |                 ---- borrow later used here

For more information about this error, try `rustc --explain E0505`.
error: could not compile `solana-runtime` (lib) due to 1 previous error

drop(scheduler);

let context = SchedulingContext::new(self.bank.clone());
let mut scheduler = self.scheduler.write().unwrap();
scheduler.transition_from_stale_to_active(|scheduler_pool, result_with_timings| {
scheduler_pool.register_timeout_listener(self.do_create_timeout_listener());
scheduler_pool.take_resumed_scheduler(context, result_with_timings)
trace!("with_active_scheduler: {:?}", scheduler);
scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
let scheduler = pool.take_resumed_scheduler(context, result_with_timings);
info!(
"with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})",
self.bank.slot(),
scheduler.id(),
);
scheduler
});
drop(scheduler);

self.with_active_scheduler(f)
let scheduler = self.scheduler.read().unwrap();
// Re-register a new timeout listener only after acquiring the read lock;
// Otherwise, the listener would again put scheduler into Stale before the read
// lock, causing panic below.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops forgot to mention this is extremely rare race condition...

pool.register_timeout_listener(self.do_create_timeout_listener());
f(scheduler.active_scheduler())
}
SchedulerStatus::Unavailable => panic!(),
SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
}
}

fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
let weak_bank = Arc::downgrade(self);
TimeoutListener::new(move |scheduler_pool| {
TimeoutListener::new(move |pool| {
let Some(bank) = weak_bank.upgrade() else {
return;
};
Expand All @@ -533,22 +567,35 @@ impl BankWithSchedulerInner {
};

scheduler.maybe_transition_from_active_to_stale(|scheduler| {
// The scheduler hasn't still been wait_for_termination()-ed after awhile...
// Return the installed scheduler back to the scheduler pool as soon as the
// scheduler gets idle after executing all currently-scheduled transactions.

let id = scheduler.id();
let (result_with_timings, uninstalled_scheduler) =
scheduler.wait_for_termination(false);
uninstalled_scheduler.return_to_pool();
(scheduler_pool, result_with_timings)
})
info!(
"timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
bank.bank.slot(),
id,
);
(pool, result_with_timings)
});
trace!("timeout_listener: {:?}", scheduler);
})
}

/// This must not be called until `Err(SchedulerAborted)` is observed. Violating this should
/// `panic!()`.
fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
let mut scheduler = self.scheduler.write().unwrap();
match &mut *scheduler {
SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
result.clone().unwrap_err()
}
_ => panic!(),
_ => unreachable!("no error in {:?}", self.scheduler),
}
}

Expand Down Expand Up @@ -601,6 +648,10 @@ impl BankWithSchedulerInner {
result_with_timings.as_ref().map(|(result, _)| result),
std::thread::current(),
);
trace!(
"wait_for_scheduler_termination(result_with_timings: {:?})",
result_with_timings,
);

result_with_timings
}
Expand Down
Loading