-
Notifications
You must be signed in to change notification settings - Fork 381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return back stale out-of-pool scheduler by timeout #1690
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,22 +30,59 @@ use { | |
transaction::{Result, SanitizedTransaction, TransactionError}, | ||
}, | ||
std::{ | ||
fmt::Debug, | ||
fmt::{self, Debug}, | ||
mem, | ||
ops::Deref, | ||
sync::{Arc, RwLock}, | ||
}, | ||
}; | ||
#[cfg(feature = "dev-context-only-utils")] | ||
use {mockall::automock, qualifier_attr::qualifiers}; | ||
|
||
pub fn initialized_result_with_timings() -> ResultWithTimings { | ||
(Ok(()), ExecuteTimings::default()) | ||
} | ||
|
||
pub trait InstalledSchedulerPool: Send + Sync + Debug { | ||
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox; | ||
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox { | ||
self.take_resumed_scheduler(context, initialized_result_with_timings()) | ||
} | ||
|
||
fn take_resumed_scheduler( | ||
&self, | ||
context: SchedulingContext, | ||
result_with_timings: ResultWithTimings, | ||
) -> InstalledSchedulerBox; | ||
|
||
fn register_timeout_listener(&self, timeout_listener: TimeoutListener); | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct SchedulerAborted; | ||
pub type ScheduleResult = std::result::Result<(), SchedulerAborted>; | ||
|
||
pub struct TimeoutListener { | ||
callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>, | ||
} | ||
|
||
impl TimeoutListener { | ||
pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self { | ||
Self { | ||
callback: Box::new(f), | ||
} | ||
} | ||
|
||
pub fn trigger(self, pool: InstalledSchedulerPoolArc) { | ||
(self.callback)(pool); | ||
} | ||
} | ||
|
||
impl Debug for TimeoutListener { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
write!(f, "TimeoutListener({self:p})") | ||
} | ||
} | ||
|
||
#[cfg_attr(doc, aquamarine::aquamarine)] | ||
/// Schedules, executes, and commits transactions under encapsulated implementation | ||
/// | ||
|
@@ -250,6 +287,75 @@ impl WaitReason { | |
} | ||
} | ||
|
||
#[allow(clippy::large_enum_variant)] | ||
#[derive(Debug)] | ||
pub enum SchedulerStatus { | ||
/// Unified scheduler is disabled or installed scheduler is consumed by wait_for_termination(). | ||
/// Note that transition to Unavailable from {Active, Stale} is one-way (i.e. one-time). | ||
Unavailable, | ||
/// Scheduler is installed into a bank; could be running or just be idling. | ||
/// This will be transitioned to Stale after certain time has passed if its bank hasn't frozen. | ||
Active(InstalledSchedulerBox), | ||
/// Scheduler is idling for long time, returning scheduler back to the pool. | ||
/// This will be immediately (i.e. transaparently) transitioned to Active as soon as there's | ||
/// new transaction to be executed. | ||
Stale(InstalledSchedulerPoolArc, ResultWithTimings), | ||
} | ||
|
||
impl SchedulerStatus { | ||
fn new(scheduler: Option<InstalledSchedulerBox>) -> Self { | ||
match scheduler { | ||
Some(scheduler) => SchedulerStatus::Active(scheduler), | ||
None => SchedulerStatus::Unavailable, | ||
} | ||
} | ||
|
||
fn transition_from_stale_to_active( | ||
&mut self, | ||
f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox, | ||
) { | ||
let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else { | ||
panic!("transition to Active failed: {self:?}"); | ||
}; | ||
*self = Self::Active(f(pool, result_with_timings)); | ||
} | ||
|
||
fn maybe_transition_from_active_to_stale( | ||
&mut self, | ||
f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings), | ||
) { | ||
if !matches!(self, Self::Active(_scheduler)) { | ||
return; | ||
} | ||
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else { | ||
unreachable!("not active: {:?}", self); | ||
}; | ||
let (pool, result_with_timings) = f(scheduler); | ||
*self = Self::Stale(pool, result_with_timings); | ||
} | ||
|
||
fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox { | ||
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else { | ||
panic!("transition to Unavailable failed: {self:?}"); | ||
}; | ||
scheduler | ||
} | ||
|
||
fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings { | ||
let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else { | ||
panic!("transition to Unavailable failed: {self:?}"); | ||
}; | ||
result_with_timings | ||
} | ||
|
||
fn active_scheduler(&self) -> &InstalledSchedulerBox { | ||
let SchedulerStatus::Active(active_scheduler) = self else { | ||
panic!("not active: {self:?}"); | ||
}; | ||
active_scheduler | ||
} | ||
} | ||
|
||
/// Very thin wrapper around Arc<Bank> | ||
/// | ||
/// It brings type-safety against accidental mixing of bank and scheduler with different slots, | ||
|
@@ -277,7 +383,7 @@ pub struct BankWithSchedulerInner { | |
bank: Arc<Bank>, | ||
scheduler: InstalledSchedulerRwLock, | ||
} | ||
pub type InstalledSchedulerRwLock = RwLock<Option<InstalledSchedulerBox>>; | ||
pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>; | ||
|
||
impl BankWithScheduler { | ||
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] | ||
|
@@ -292,7 +398,7 @@ impl BankWithScheduler { | |
Self { | ||
inner: Arc::new(BankWithSchedulerInner { | ||
bank, | ||
scheduler: RwLock::new(scheduler), | ||
scheduler: RwLock::new(SchedulerStatus::new(scheduler)), | ||
}), | ||
} | ||
} | ||
|
@@ -321,13 +427,19 @@ impl BankWithScheduler { | |
} | ||
|
||
pub fn has_installed_scheduler(&self) -> bool { | ||
self.inner.scheduler.read().unwrap().is_some() | ||
!matches!( | ||
&*self.inner.scheduler.read().unwrap(), | ||
SchedulerStatus::Unavailable | ||
) | ||
} | ||
|
||
/// Schedule the transaction as long as the scheduler hasn't been aborted. | ||
/// | ||
/// If the scheduler has been aborted, this doesn't schedule the transaction, instead just | ||
/// return the error of prior scheduled transaction. | ||
/// | ||
/// Calling this will panic if the installed scheduler is Unavailable (the bank is | ||
/// wait_for_termination()-ed or the unified scheduler is disabled in the first place). | ||
// 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet... | ||
pub fn schedule_transaction_executions<'a>( | ||
&self, | ||
|
@@ -338,31 +450,32 @@ impl BankWithScheduler { | |
transactions_with_indexes.len() | ||
); | ||
|
||
let scheduler_guard = self.inner.scheduler.read().unwrap(); | ||
let scheduler = scheduler_guard.as_ref().unwrap(); | ||
|
||
for (sanitized_transaction, &index) in transactions_with_indexes { | ||
if scheduler | ||
.schedule_execution(&(sanitized_transaction, index)) | ||
.is_err() | ||
{ | ||
drop(scheduler_guard); | ||
// This write lock isn't atomic with the above the read lock. So, another thread | ||
// could have called .recover_error_after_abort() while we're literally stuck at | ||
// the gaps of these locks (i.e. this comment in source code wise) under extreme | ||
// race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that | ||
// consideration in mind. | ||
// | ||
// Lastly, this non-atomic nature is intentional for optimizing the fast code-path | ||
let mut scheduler_guard = self.inner.scheduler.write().unwrap(); | ||
let scheduler = scheduler_guard.as_mut().unwrap(); | ||
return Err(scheduler.recover_error_after_abort()); | ||
let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| { | ||
for (sanitized_transaction, &index) in transactions_with_indexes { | ||
scheduler.schedule_execution(&(sanitized_transaction, index))?; | ||
} | ||
Ok(()) | ||
}); | ||
|
||
if schedule_result.is_err() { | ||
// This write lock isn't atomic with the above the read lock. So, another thread | ||
// could have called .recover_error_after_abort() while we're literally stuck at | ||
// the gaps of these locks (i.e. this comment in source code wise) under extreme | ||
// race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that | ||
// consideration in mind. | ||
// | ||
// Lastly, this non-atomic nature is intentional for optimizing the fast code-path | ||
return Err(self.inner.retrieve_error_after_schedule_failure()); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] | ||
pub(crate) fn create_timeout_listener(&self) -> TimeoutListener { | ||
self.inner.do_create_timeout_listener() | ||
} | ||
|
||
// take needless &mut only to communicate its semantic mutability to humans... | ||
#[cfg(feature = "dev-context-only-utils")] | ||
pub fn drop_scheduler(&mut self) { | ||
|
@@ -391,11 +504,101 @@ impl BankWithScheduler { | |
} | ||
|
||
pub const fn no_scheduler_available() -> InstalledSchedulerRwLock { | ||
RwLock::new(None) | ||
RwLock::new(SchedulerStatus::Unavailable) | ||
} | ||
} | ||
|
||
impl BankWithSchedulerInner { | ||
fn with_active_scheduler( | ||
self: &Arc<Self>, | ||
f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult, | ||
) -> ScheduleResult { | ||
let scheduler = self.scheduler.read().unwrap(); | ||
match &*scheduler { | ||
SchedulerStatus::Active(scheduler) => { | ||
// This is the fast path, needing single read-lock most of time. | ||
f(scheduler) | ||
} | ||
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => { | ||
trace!( | ||
"with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...", | ||
self.bank.slot(), | ||
); | ||
Err(SchedulerAborted) | ||
} | ||
SchedulerStatus::Stale(pool, _result_with_timings) => { | ||
let pool = pool.clone(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to clone the pool? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because this
|
||
drop(scheduler); | ||
|
||
let context = SchedulingContext::new(self.bank.clone()); | ||
let mut scheduler = self.scheduler.write().unwrap(); | ||
trace!("with_active_scheduler: {:?}", scheduler); | ||
scheduler.transition_from_stale_to_active(|pool, result_with_timings| { | ||
let scheduler = pool.take_resumed_scheduler(context, result_with_timings); | ||
info!( | ||
"with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})", | ||
self.bank.slot(), | ||
scheduler.id(), | ||
); | ||
scheduler | ||
}); | ||
drop(scheduler); | ||
|
||
let scheduler = self.scheduler.read().unwrap(); | ||
// Re-register a new timeout listener only after acquiring the read lock; | ||
// Otherwise, the listener would again put scheduler into Stale before the read | ||
// lock, causing panic below. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oops forgot to mention this is extremely rare race condition... |
||
pool.register_timeout_listener(self.do_create_timeout_listener()); | ||
f(scheduler.active_scheduler()) | ||
} | ||
SchedulerStatus::Unavailable => unreachable!("no installed scheduler"), | ||
} | ||
} | ||
|
||
fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener { | ||
let weak_bank = Arc::downgrade(self); | ||
TimeoutListener::new(move |pool| { | ||
let Some(bank) = weak_bank.upgrade() else { | ||
return; | ||
}; | ||
|
||
let Ok(mut scheduler) = bank.scheduler.write() else { | ||
return; | ||
}; | ||
|
||
scheduler.maybe_transition_from_active_to_stale(|scheduler| { | ||
// The scheduler hasn't still been wait_for_termination()-ed after awhile... | ||
// Return the installed scheduler back to the scheduler pool as soon as the | ||
// scheduler gets idle after executing all currently-scheduled transactions. | ||
|
||
let id = scheduler.id(); | ||
let (result_with_timings, uninstalled_scheduler) = | ||
scheduler.wait_for_termination(false); | ||
uninstalled_scheduler.return_to_pool(); | ||
info!( | ||
"timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})", | ||
bank.bank.slot(), | ||
id, | ||
); | ||
(pool, result_with_timings) | ||
}); | ||
trace!("timeout_listener: {:?}", scheduler); | ||
}) | ||
} | ||
|
||
/// This must not be called until `Err(SchedulerAborted)` is observed. Violating this should | ||
/// `panic!()`. | ||
fn retrieve_error_after_schedule_failure(&self) -> TransactionError { | ||
let mut scheduler = self.scheduler.write().unwrap(); | ||
match &mut *scheduler { | ||
SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(), | ||
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => { | ||
result.clone().unwrap_err() | ||
} | ||
_ => unreachable!("no error in {:?}", self.scheduler), | ||
} | ||
} | ||
|
||
#[must_use] | ||
fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> { | ||
Self::wait_for_scheduler_termination( | ||
|
@@ -419,18 +622,24 @@ impl BankWithSchedulerInner { | |
); | ||
|
||
let mut scheduler = scheduler.write().unwrap(); | ||
let (was_noop, result_with_timings) = | ||
if let Some(scheduler) = scheduler.as_mut().filter(|_| reason.is_paused()) { | ||
let (was_noop, result_with_timings) = match &mut *scheduler { | ||
SchedulerStatus::Active(scheduler) if reason.is_paused() => { | ||
scheduler.pause_for_recent_blockhash(); | ||
(false, None) | ||
} else if let Some(scheduler) = scheduler.take() { | ||
} | ||
SchedulerStatus::Active(_scheduler) => { | ||
let scheduler = scheduler.transition_from_active_to_unavailable(); | ||
let (result_with_timings, uninstalled_scheduler) = | ||
scheduler.wait_for_termination(reason.is_dropped()); | ||
uninstalled_scheduler.return_to_pool(); | ||
(false, Some(result_with_timings)) | ||
} else { | ||
(true, None) | ||
}; | ||
} | ||
SchedulerStatus::Stale(_pool, _result_with_timings) => { | ||
let result_with_timings = scheduler.transition_from_stale_to_unavailable(); | ||
(true, Some(result_with_timings)) | ||
} | ||
SchedulerStatus::Unavailable => (true, None), | ||
}; | ||
debug!( | ||
"wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...", | ||
bank.slot(), | ||
|
@@ -439,6 +648,10 @@ impl BankWithSchedulerInner { | |
result_with_timings.as_ref().map(|(result, _)| result), | ||
std::thread::current(), | ||
); | ||
trace!( | ||
"wait_for_scheduler_termination(result_with_timings: {:?})", | ||
result_with_timings, | ||
); | ||
|
||
result_with_timings | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't seem strictly true, because it seems to also be used as an intermediate state when transitioning between active <-> stale?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow, nice you're spotting this. I've omitted for simplicity here. I will do a follow-up for completeness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be addressed by: #1797