-
Notifications
You must be signed in to change notification settings - Fork 381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return back stale out-of-pool scheduler by timeout #1690
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -290,8 +290,15 @@ impl WaitReason { | |
#[allow(clippy::large_enum_variant)] | ||
#[derive(Debug)] | ||
pub enum SchedulerStatus { | ||
/// Unified scheduler is disabled or installed scheduler is consumed by wait_for_termination(). | ||
/// Note that transition to Unavailable from {Active, Stale} is one-way (i.e. one-time). | ||
Unavailable, | ||
/// Scheduler is installed into a bank; could be running or just be idling. | ||
/// This will be transitioned to Stale after certain time has passed if its bank hasn't frozen. | ||
Active(InstalledSchedulerBox), | ||
/// Scheduler is idling for long time, returning scheduler back to the pool. | ||
/// This will be immediately (i.e. transaparently) transitioned to Active as soon as there's | ||
/// new transaction to be executed. | ||
Stale(InstalledSchedulerPoolArc, ResultWithTimings), | ||
} | ||
|
||
|
@@ -308,38 +315,45 @@ impl SchedulerStatus { | |
f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox, | ||
) { | ||
let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else { | ||
panic!(); | ||
panic!("transition to Active failed: {self:?}"); | ||
}; | ||
*self = Self::Active(f(pool, result_with_timings)); | ||
} | ||
|
||
pub(crate) fn maybe_transition_from_active_to_stale( | ||
fn maybe_transition_from_active_to_stale( | ||
&mut self, | ||
f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings), | ||
) { | ||
if !matches!(self, Self::Active(_scheduler)) { | ||
return; | ||
} | ||
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else { | ||
panic!(); | ||
unreachable!("not active: {:?}", self); | ||
}; | ||
let (pool, result_with_timings) = f(scheduler); | ||
*self = Self::Stale(pool, result_with_timings); | ||
} | ||
|
||
pub(crate) fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox { | ||
fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox { | ||
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else { | ||
panic!(); | ||
panic!("transition to Unavailable failed: {self:?}"); | ||
}; | ||
scheduler | ||
} | ||
|
||
pub(crate) fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings { | ||
fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings { | ||
let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else { | ||
panic!(); | ||
panic!("transition to Unavailable failed: {self:?}"); | ||
}; | ||
result_with_timings | ||
} | ||
|
||
fn active_scheduler(&self) -> &InstalledSchedulerBox { | ||
let SchedulerStatus::Active(active_scheduler) = self else { | ||
panic!("not active: {self:?}"); | ||
}; | ||
active_scheduler | ||
} | ||
} | ||
|
||
/// Very thin wrapper around Arc<Bank> | ||
|
@@ -423,6 +437,9 @@ impl BankWithScheduler { | |
/// | ||
/// If the scheduler has been aborted, this doesn't schedule the transaction, instead just | ||
/// return the error of prior scheduled transaction. | ||
/// | ||
/// Calling this will panic if the installed scheduler is Unavailable (the bank is | ||
/// wait_for_termination()-ed or the unified scheduler is disabled in the first place). | ||
// 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet... | ||
pub fn schedule_transaction_executions<'a>( | ||
&self, | ||
|
@@ -454,6 +471,7 @@ impl BankWithScheduler { | |
Ok(()) | ||
} | ||
|
||
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] | ||
pub(crate) fn create_timeout_listener(&self) -> TimeoutListener { | ||
self.inner.do_create_timeout_listener() | ||
} | ||
|
@@ -502,28 +520,44 @@ impl BankWithSchedulerInner { | |
f(scheduler) | ||
} | ||
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => { | ||
trace!( | ||
"with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...", | ||
self.bank.slot(), | ||
); | ||
Err(SchedulerAborted) | ||
} | ||
SchedulerStatus::Stale(_pool, _result_with_timings) => { | ||
SchedulerStatus::Stale(pool, _result_with_timings) => { | ||
let pool = pool.clone(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to clone the pool? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because this
|
||
drop(scheduler); | ||
|
||
let context = SchedulingContext::new(self.bank.clone()); | ||
let mut scheduler = self.scheduler.write().unwrap(); | ||
scheduler.transition_from_stale_to_active(|scheduler_pool, result_with_timings| { | ||
scheduler_pool.register_timeout_listener(self.do_create_timeout_listener()); | ||
scheduler_pool.take_resumed_scheduler(context, result_with_timings) | ||
trace!("with_active_scheduler: {:?}", scheduler); | ||
scheduler.transition_from_stale_to_active(|pool, result_with_timings| { | ||
let scheduler = pool.take_resumed_scheduler(context, result_with_timings); | ||
info!( | ||
"with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})", | ||
self.bank.slot(), | ||
scheduler.id(), | ||
); | ||
scheduler | ||
}); | ||
drop(scheduler); | ||
|
||
self.with_active_scheduler(f) | ||
let scheduler = self.scheduler.read().unwrap(); | ||
// Re-register a new timeout listener only after acquiring the read lock; | ||
// Otherwise, the listener would again put scheduler into Stale before the read | ||
// lock, causing panic below. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oops forgot to mention this is extremely rare race condition... |
||
pool.register_timeout_listener(self.do_create_timeout_listener()); | ||
f(scheduler.active_scheduler()) | ||
} | ||
SchedulerStatus::Unavailable => panic!(), | ||
SchedulerStatus::Unavailable => unreachable!("no installed scheduler"), | ||
} | ||
} | ||
|
||
fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener { | ||
let weak_bank = Arc::downgrade(self); | ||
TimeoutListener::new(move |scheduler_pool| { | ||
TimeoutListener::new(move |pool| { | ||
let Some(bank) = weak_bank.upgrade() else { | ||
return; | ||
}; | ||
|
@@ -533,22 +567,35 @@ impl BankWithSchedulerInner { | |
}; | ||
|
||
scheduler.maybe_transition_from_active_to_stale(|scheduler| { | ||
// The scheduler hasn't still been wait_for_termination()-ed after awhile... | ||
// Return the installed scheduler back to the scheduler pool as soon as the | ||
// scheduler gets idle after executing all currently-scheduled transactions. | ||
|
||
let id = scheduler.id(); | ||
let (result_with_timings, uninstalled_scheduler) = | ||
scheduler.wait_for_termination(false); | ||
uninstalled_scheduler.return_to_pool(); | ||
(scheduler_pool, result_with_timings) | ||
}) | ||
info!( | ||
"timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})", | ||
bank.bank.slot(), | ||
id, | ||
); | ||
(pool, result_with_timings) | ||
}); | ||
trace!("timeout_listener: {:?}", scheduler); | ||
}) | ||
} | ||
|
||
/// This must not be called until `Err(SchedulerAborted)` is observed. Violating this should | ||
/// `panic!()`. | ||
fn retrieve_error_after_schedule_failure(&self) -> TransactionError { | ||
let mut scheduler = self.scheduler.write().unwrap(); | ||
match &mut *scheduler { | ||
SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(), | ||
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => { | ||
result.clone().unwrap_err() | ||
} | ||
_ => panic!(), | ||
_ => unreachable!("no error in {:?}", self.scheduler), | ||
} | ||
} | ||
|
||
|
@@ -601,6 +648,10 @@ impl BankWithSchedulerInner { | |
result_with_timings.as_ref().map(|(result, _)| result), | ||
std::thread::current(), | ||
); | ||
trace!( | ||
"wait_for_scheduler_termination(result_with_timings: {:?})", | ||
result_with_timings, | ||
); | ||
|
||
result_with_timings | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't seem strictly true, because it seems to also be used as an intermediate state when transitioning between active <-> stale?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow, nice you're spotting this. I've omitted for simplicity here. I will do a follow-up for completeness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be addressed by: #1797