-
Notifications
You must be signed in to change notification settings - Fork 381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clean idle pooled schedulers periodically #1575
Changes from all commits
42eaadb
f4f7192
c3f9959
645f8bc
2b90fd1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,8 +49,9 @@ use { | |
Arc, Mutex, OnceLock, Weak, | ||
}, | ||
thread::{self, sleep, JoinHandle}, | ||
time::Duration, | ||
time::{Duration, Instant}, | ||
}, | ||
vec_extract_if_polyfill::MakeExtractIf, | ||
}; | ||
|
||
mod sleepless_testing; | ||
|
@@ -63,6 +64,7 @@ enum CheckPoint { | |
NewTask(usize), | ||
TaskHandled(usize), | ||
SchedulerThreadAborted, | ||
IdleSchedulerCleaned(usize), | ||
TrashedSchedulerCleaned(usize), | ||
} | ||
|
||
|
@@ -73,7 +75,7 @@ type AtomicSchedulerId = AtomicU64; | |
// TransactionStatusSender; also, PohRecorder in the future)... | ||
#[derive(Debug)] | ||
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> { | ||
scheduler_inners: Mutex<Vec<S::Inner>>, | ||
scheduler_inners: Mutex<Vec<(S::Inner, Instant)>>, | ||
trashed_scheduler_inners: Mutex<Vec<S::Inner>>, | ||
handler_count: usize, | ||
handler_context: HandlerContext, | ||
|
@@ -104,6 +106,7 @@ pub type DefaultSchedulerPool = | |
SchedulerPool<PooledScheduler<DefaultTaskHandler>, DefaultTaskHandler>; | ||
|
||
const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10); | ||
const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180); | ||
|
||
impl<S, TH> SchedulerPool<S, TH> | ||
where | ||
|
@@ -127,6 +130,7 @@ where | |
replay_vote_sender, | ||
prioritization_fee_cache, | ||
DEFAULT_POOL_CLEANER_INTERVAL, | ||
DEFAULT_MAX_POOLING_DURATION, | ||
) | ||
} | ||
|
||
|
@@ -137,6 +141,7 @@ where | |
replay_vote_sender: Option<ReplayVoteSender>, | ||
prioritization_fee_cache: Arc<PrioritizationFeeCache>, | ||
pool_cleaner_interval: Duration, | ||
max_pooling_duration: Duration, | ||
) -> Arc<Self> { | ||
let handler_count = handler_count.unwrap_or(Self::default_handler_count()); | ||
assert!(handler_count >= 1); | ||
|
@@ -166,6 +171,33 @@ where | |
break; | ||
}; | ||
|
||
let idle_inner_count = { | ||
let now = Instant::now(); | ||
|
||
// Pre-allocate rather large capacity to avoid reallocation inside the lock. | ||
let mut idle_inners = Vec::with_capacity(128); | ||
|
||
let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else { | ||
break; | ||
}; | ||
// Use the still-unstable Vec::extract_if() even on stable rust toolchain by | ||
// using a polyfill and allowing unstable_name_collisions, because it's | ||
// simplest to code and fastest to run (= O(n); single linear pass and no | ||
// reallocation). | ||
// | ||
// Note that this critical section could block the latency-sensitive replay | ||
// code-path via ::take_scheduler(). | ||
#[allow(unstable_name_collisions)] | ||
idle_inners.extend(scheduler_inners.extract_if(|(_inner, pooled_at)| { | ||
now.duration_since(*pooled_at) > max_pooling_duration | ||
})); | ||
drop(scheduler_inners); | ||
|
||
let idle_inner_count = idle_inners.len(); | ||
drop(idle_inners); | ||
idle_inner_count | ||
}; | ||
|
||
let trashed_inner_count = { | ||
let Ok(mut trashed_scheduler_inners) = | ||
scheduler_pool.trashed_scheduler_inners.lock() | ||
|
@@ -181,9 +213,10 @@ where | |
}; | ||
|
||
info!( | ||
"Scheduler pool cleaner: dropped {} trashed inners", | ||
trashed_inner_count, | ||
"Scheduler pool cleaner: dropped {} idle inners, {} trashed inners", | ||
idle_inner_count, trashed_inner_count, | ||
); | ||
sleepless_testing::at(CheckPoint::IdleSchedulerCleaned(idle_inner_count)); | ||
sleepless_testing::at(CheckPoint::TrashedSchedulerCleaned(trashed_inner_count)); | ||
} | ||
}; | ||
|
@@ -242,14 +275,15 @@ where | |
self.scheduler_inners | ||
.lock() | ||
.expect("not poisoned") | ||
.push(scheduler); | ||
.push((scheduler, Instant::now())); | ||
} | ||
} | ||
|
||
fn do_take_scheduler(&self, context: SchedulingContext) -> S { | ||
// pop is intentional for filo, expecting relatively warmed-up scheduler due to having been | ||
// returned recently | ||
if let Some(inner) = self.scheduler_inners.lock().expect("not poisoned").pop() { | ||
if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop() | ||
{ | ||
S::from_inner(inner, context) | ||
} else { | ||
S::spawn(self.self_arc(), context) | ||
|
@@ -1289,6 +1323,9 @@ mod tests { | |
BeforeNewTask, | ||
AfterTaskHandled, | ||
AfterSchedulerThreadAborted, | ||
BeforeIdleSchedulerCleaned, | ||
AfterIdleSchedulerCleaned, | ||
BeforeTrashedSchedulerCleaned, | ||
AfterTrashedSchedulerCleaned, | ||
BeforeThreadManagerDrop, | ||
} | ||
|
@@ -1326,6 +1363,62 @@ mod tests { | |
|
||
const SHORTENED_POOL_CLEANER_INTERVAL: Duration = Duration::from_millis(1); | ||
|
||
#[test] | ||
fn test_scheduler_drop_idle() { | ||
solana_logger::setup(); | ||
|
||
let _progress = sleepless_testing::setup(&[ | ||
&TestCheckPoint::BeforeIdleSchedulerCleaned, | ||
&CheckPoint::IdleSchedulerCleaned(0), | ||
&CheckPoint::IdleSchedulerCleaned(1), | ||
&TestCheckPoint::AfterIdleSchedulerCleaned, | ||
]); | ||
|
||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); | ||
let shortened_max_pooling_duration = Duration::from_millis(10); | ||
let pool_raw = DefaultSchedulerPool::do_new( | ||
None, | ||
None, | ||
None, | ||
None, | ||
ignored_prioritization_fee_cache, | ||
SHORTENED_POOL_CLEANER_INTERVAL, | ||
shortened_max_pooling_duration, | ||
); | ||
let pool = pool_raw.clone(); | ||
let bank = Arc::new(Bank::default_for_tests()); | ||
let context1 = SchedulingContext::new(bank); | ||
let context2 = context1.clone(); | ||
|
||
let old_scheduler = pool.do_take_scheduler(context1); | ||
let new_scheduler = pool.do_take_scheduler(context2); | ||
let new_scheduler_id = new_scheduler.id(); | ||
Box::new(old_scheduler.into_inner().1).return_to_pool(); | ||
|
||
// sleepless_testing can't be used; wait a bit here to see real progress of wall time... | ||
sleep(shortened_max_pooling_duration * 10); | ||
Box::new(new_scheduler.into_inner().1).return_to_pool(); | ||
|
||
// Block solScCleaner until we see returned schedlers... | ||
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 2); | ||
sleepless_testing::at(TestCheckPoint::BeforeIdleSchedulerCleaned); | ||
|
||
// See the old (= idle) scheduler gone only after solScCleaner did its job... | ||
sleepless_testing::at(&TestCheckPoint::AfterIdleSchedulerCleaned); | ||
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1); | ||
assert_eq!( | ||
pool_raw | ||
.scheduler_inners | ||
.lock() | ||
.unwrap() | ||
.first() | ||
.as_ref() | ||
.map(|(inner, _pooled_at)| inner.id()) | ||
.unwrap(), | ||
new_scheduler_id | ||
); | ||
} | ||
|
||
enum AbortCase { | ||
Unhandled, | ||
UnhandledWhilePanicking, | ||
|
@@ -1658,6 +1751,8 @@ mod tests { | |
&TestCheckPoint::AfterTaskHandled, | ||
&CheckPoint::SchedulerThreadAborted, | ||
&TestCheckPoint::AfterSchedulerThreadAborted, | ||
&TestCheckPoint::BeforeTrashedSchedulerCleaned, | ||
&CheckPoint::TrashedSchedulerCleaned(0), | ||
&CheckPoint::TrashedSchedulerCleaned(1), | ||
&TestCheckPoint::AfterTrashedSchedulerCleaned, | ||
]); | ||
|
@@ -1678,6 +1773,7 @@ mod tests { | |
None, | ||
ignored_prioritization_fee_cache, | ||
SHORTENED_POOL_CLEANER_INTERVAL, | ||
DEFAULT_MAX_POOLING_DURATION, | ||
); | ||
let pool = pool_raw.clone(); | ||
let context = SchedulingContext::new(bank.clone()); | ||
|
@@ -1729,7 +1825,12 @@ mod tests { | |
bank.wait_for_completed_scheduler(), | ||
Some((Err(TransactionError::AccountNotFound), _timings)) | ||
); | ||
|
||
// Block solScCleaner until we see trashed schedler... | ||
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 1); | ||
sleepless_testing::at(TestCheckPoint::BeforeTrashedSchedulerCleaned); | ||
|
||
// See the trashed scheduler gone only after solScCleaner did its job... | ||
sleepless_testing::at(TestCheckPoint::AfterTrashedSchedulerCleaned); | ||
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0); | ||
Comment on lines
+1829
to
1835
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it turns out this test is racy to begin with... it seems this test isn't racy unless i add new logic to solScCleaner. so, I'm piggybacking this into this pr. |
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to this page, we're the first consumer of the polyfill crate: https://crates.io/crates/vec_extract_if_polyfill/reverse_dependencies
however, i've confirmed that this crate is almost verbatim copy of what's gated behind the unstable feature flag in our present rust-nightly std source:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why we're adding the dependency instead of just using a nightly feature if we really need it? The docs even say this polyfill crate uses it:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we only supported building with nightly, it would work.
but,
Vec::extract_if()
isn't available for stable rust yet...