-
Notifications
You must be signed in to change notification settings - Fork 381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clean idle pooled schedulers periodically #1575
Changes from 4 commits
42eaadb
f4f7192
c3f9959
645f8bc
2b90fd1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,8 +49,9 @@ use { | |
Arc, Mutex, OnceLock, Weak, | ||
}, | ||
thread::{self, sleep, JoinHandle}, | ||
time::Duration, | ||
time::{Duration, Instant}, | ||
}, | ||
vec_extract_if_polyfill::MakeExtractIf, | ||
}; | ||
|
||
mod sleepless_testing; | ||
|
@@ -63,6 +64,7 @@ enum CheckPoint { | |
NewTask(usize), | ||
TaskHandled(usize), | ||
SchedulerThreadAborted, | ||
IdleSchedulerCleaned(usize), | ||
TrashedSchedulerCleaned(usize), | ||
} | ||
|
||
|
@@ -73,7 +75,7 @@ type AtomicSchedulerId = AtomicU64; | |
// TransactionStatusSender; also, PohRecorder in the future)... | ||
#[derive(Debug)] | ||
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> { | ||
scheduler_inners: Mutex<Vec<S::Inner>>, | ||
scheduler_inners: Mutex<Vec<(S::Inner, Instant)>>, | ||
trashed_scheduler_inners: Mutex<Vec<S::Inner>>, | ||
handler_count: usize, | ||
handler_context: HandlerContext, | ||
|
@@ -104,6 +106,7 @@ pub type DefaultSchedulerPool = | |
SchedulerPool<PooledScheduler<DefaultTaskHandler>, DefaultTaskHandler>; | ||
|
||
const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10); | ||
const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180); | ||
|
||
impl<S, TH> SchedulerPool<S, TH> | ||
where | ||
|
@@ -127,6 +130,7 @@ where | |
replay_vote_sender, | ||
prioritization_fee_cache, | ||
DEFAULT_POOL_CLEANER_INTERVAL, | ||
DEFAULT_MAX_POOLING_DURATION, | ||
) | ||
} | ||
|
||
|
@@ -137,6 +141,7 @@ where | |
replay_vote_sender: Option<ReplayVoteSender>, | ||
prioritization_fee_cache: Arc<PrioritizationFeeCache>, | ||
pool_cleaner_interval: Duration, | ||
max_pooling_duration: Duration, | ||
) -> Arc<Self> { | ||
let handler_count = handler_count.unwrap_or(Self::default_handler_count()); | ||
assert!(handler_count >= 1); | ||
|
@@ -166,6 +171,25 @@ where | |
break; | ||
}; | ||
|
||
let idle_inner_count = { | ||
let now = Instant::now(); | ||
let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else { | ||
break; | ||
}; | ||
// I want to use the fancy ::extract_if() no matter what.... | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why though? it seems like we can get something similar with retain and collecting a vec inside? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
And There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well, while waiting for lgtm, I managed to write a in-source commentary: 2b90fd1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah agree it's a good match, would just love if the feature were stable. Just concerned with adding dependency on an otherwise unused crate. repo has no readme either. kind of curious how you found it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
well, crates.io's own search functionality didn't work... i just googled...: https://www.google.com/search?q=rust+extract_if lol
as for upgrades, that's certainly a possibility. however, our dep is pinned to exact commit hash via also as far as i screened author profile pages (https://github.com/chyyran and https://crates.io/users/chyyran). i think there's only legit activities. Lastly, I'm planning to advertise this crate at the rust's tracking issue to increase the exposure of this crate across the ecosystem and to move this forward the stabilization. |
||
#[allow(unstable_name_collisions)] | ||
let idle_inners = scheduler_inners | ||
.extract_if(|(_inner, pooled_at)| { | ||
now.duration_since(*pooled_at) > max_pooling_duration | ||
}) | ||
.collect::<Vec<_>>(); | ||
drop(scheduler_inners); | ||
|
||
let idle_inner_count = idle_inners.len(); | ||
drop(idle_inners); | ||
idle_inner_count | ||
}; | ||
|
||
let trashed_inner_count = { | ||
let Ok(mut trashed_scheduler_inners) = | ||
scheduler_pool.trashed_scheduler_inners.lock() | ||
|
@@ -181,9 +205,10 @@ where | |
}; | ||
|
||
info!( | ||
"Scheduler pool cleaner: dropped {} trashed inners", | ||
trashed_inner_count, | ||
"Scheduler pool cleaner: dropped {} idle inners, {} trashed inners", | ||
idle_inner_count, trashed_inner_count, | ||
); | ||
sleepless_testing::at(CheckPoint::IdleSchedulerCleaned(idle_inner_count)); | ||
sleepless_testing::at(CheckPoint::TrashedSchedulerCleaned(trashed_inner_count)); | ||
} | ||
}; | ||
|
@@ -242,14 +267,15 @@ where | |
self.scheduler_inners | ||
.lock() | ||
.expect("not poisoned") | ||
.push(scheduler); | ||
.push((scheduler, Instant::now())); | ||
} | ||
} | ||
|
||
fn do_take_scheduler(&self, context: SchedulingContext) -> S { | ||
// pop is intentional for filo, expecting relatively warmed-up scheduler due to having been | ||
// returned recently | ||
if let Some(inner) = self.scheduler_inners.lock().expect("not poisoned").pop() { | ||
if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop() | ||
{ | ||
S::from_inner(inner, context) | ||
} else { | ||
S::spawn(self.self_arc(), context) | ||
|
@@ -1289,6 +1315,9 @@ mod tests { | |
BeforeNewTask, | ||
AfterTaskHandled, | ||
AfterSchedulerThreadAborted, | ||
BeforeIdleSchedulerCleaned, | ||
AfterIdleSchedulerCleaned, | ||
BeforeTrashedSchedulerCleaned, | ||
AfterTrashedSchedulerCleaned, | ||
BeforeThreadManagerDrop, | ||
} | ||
|
@@ -1326,6 +1355,62 @@ mod tests { | |
|
||
const SHORTENED_POOL_CLEANER_INTERVAL: Duration = Duration::from_millis(1); | ||
|
||
#[test] | ||
fn test_scheduler_drop_idle() { | ||
solana_logger::setup(); | ||
|
||
let _progress = sleepless_testing::setup(&[ | ||
&TestCheckPoint::BeforeIdleSchedulerCleaned, | ||
&CheckPoint::IdleSchedulerCleaned(0), | ||
&CheckPoint::IdleSchedulerCleaned(1), | ||
&TestCheckPoint::AfterIdleSchedulerCleaned, | ||
]); | ||
|
||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); | ||
let shortened_max_pooling_duration = Duration::from_millis(10); | ||
let pool_raw = DefaultSchedulerPool::do_new( | ||
None, | ||
None, | ||
None, | ||
None, | ||
ignored_prioritization_fee_cache, | ||
SHORTENED_POOL_CLEANER_INTERVAL, | ||
shortened_max_pooling_duration, | ||
); | ||
let pool = pool_raw.clone(); | ||
let bank = Arc::new(Bank::default_for_tests()); | ||
let context1 = SchedulingContext::new(bank); | ||
let context2 = context1.clone(); | ||
|
||
let old_scheduler = pool.do_take_scheduler(context1); | ||
let new_scheduler = pool.do_take_scheduler(context2); | ||
let new_scheduler_id = new_scheduler.id(); | ||
Box::new(old_scheduler.into_inner().1).return_to_pool(); | ||
|
||
// sleepless_testing can't be used; wait a bit here to see real progress of wall time... | ||
sleep(shortened_max_pooling_duration * 10); | ||
Box::new(new_scheduler.into_inner().1).return_to_pool(); | ||
|
||
// Block solScCleaner until we see returned schedlers... | ||
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 2); | ||
sleepless_testing::at(TestCheckPoint::BeforeIdleSchedulerCleaned); | ||
|
||
// See the old (= idle) scheduler gone only after solScCleaner did its job... | ||
sleepless_testing::at(&TestCheckPoint::AfterIdleSchedulerCleaned); | ||
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1); | ||
assert_eq!( | ||
pool_raw | ||
.scheduler_inners | ||
.lock() | ||
.unwrap() | ||
.first() | ||
.as_ref() | ||
.map(|(inner, _pooled_at)| inner.id()) | ||
.unwrap(), | ||
new_scheduler_id | ||
); | ||
} | ||
|
||
enum AbortCase { | ||
Unhandled, | ||
UnhandledWhilePanicking, | ||
|
@@ -1658,6 +1743,8 @@ mod tests { | |
&TestCheckPoint::AfterTaskHandled, | ||
&CheckPoint::SchedulerThreadAborted, | ||
&TestCheckPoint::AfterSchedulerThreadAborted, | ||
&TestCheckPoint::BeforeTrashedSchedulerCleaned, | ||
&CheckPoint::TrashedSchedulerCleaned(0), | ||
&CheckPoint::TrashedSchedulerCleaned(1), | ||
&TestCheckPoint::AfterTrashedSchedulerCleaned, | ||
]); | ||
|
@@ -1678,6 +1765,7 @@ mod tests { | |
None, | ||
ignored_prioritization_fee_cache, | ||
SHORTENED_POOL_CLEANER_INTERVAL, | ||
DEFAULT_MAX_POOLING_DURATION, | ||
); | ||
let pool = pool_raw.clone(); | ||
let context = SchedulingContext::new(bank.clone()); | ||
|
@@ -1729,7 +1817,12 @@ mod tests { | |
bank.wait_for_completed_scheduler(), | ||
Some((Err(TransactionError::AccountNotFound), _timings)) | ||
); | ||
|
||
// Block solScCleaner until we see trashed schedler... | ||
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 1); | ||
sleepless_testing::at(TestCheckPoint::BeforeTrashedSchedulerCleaned); | ||
|
||
// See the trashed scheduler gone only after solScCleaner did its job... | ||
sleepless_testing::at(TestCheckPoint::AfterTrashedSchedulerCleaned); | ||
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0); | ||
Comment on lines
+1829
to
1835
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it turns out this test is racy to begin with... it seems this test isn't racy unless i add new logic to solScCleaner. so, I'm piggybacking this into this pr. |
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to this page, we're the first consumer of the polyfill crate: https://crates.io/crates/vec_extract_if_polyfill/reverse_dependencies
however, i've confirmed that this crate is almost verbatim copy of what's gated behind the unstable feature flag in our present rust-nightly std source:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why we're adding the dependency instead of just using a nightly feature if we really need it? The docs even say this polyfill crate uses it:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we only supported building with nightly, it would work.
but,
Vec::extract_if()
isn't available for stable rust yet...