From 9984a5f036117d641636cf48fdbf839c6f14166f Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sun, 2 Jun 2024 14:21:45 +0900 Subject: [PATCH] Clean idle pooled scheudlers periodically --- unified-scheduler-pool/src/lib.rs | 71 ++++++++++++++++++++++++++++--- 1 file changed, 65 insertions(+), 6 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 15e56cfc488a90..2bfab47e88c223 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -49,7 +49,7 @@ use { Arc, Mutex, OnceLock, Weak, }, thread::{self, sleep, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }, }; @@ -63,6 +63,7 @@ enum CheckPoint { NewTask(usize), TaskHandled(usize), SchedulerThreadAborted, + IdleSchedulerCleaned(usize), TrashedSchedulerCleaned(usize), } @@ -73,7 +74,7 @@ type AtomicSchedulerId = AtomicU64; // TransactionStatusSender; also, PohRecorder in the future)... #[derive(Debug)] pub struct SchedulerPool, TH: TaskHandler> { - scheduler_inners: Mutex>, + scheduler_inners: Mutex>, trashed_scheduler_inners: Mutex>, handler_count: usize, handler_context: HandlerContext, @@ -104,6 +105,7 @@ pub type DefaultSchedulerPool = SchedulerPool, DefaultTaskHandler>; const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10); +const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180); impl SchedulerPool where @@ -127,6 +129,7 @@ where replay_vote_sender, prioritization_fee_cache, DEFAULT_POOL_CLEANER_INTERVAL, + DEFAULT_MAX_POOLING_DURATION, ) } @@ -137,6 +140,7 @@ where replay_vote_sender: Option, prioritization_fee_cache: Arc, pool_cleaner_interval: Duration, + max_pooling_duration: Duration, ) -> Arc { let handler_count = handler_count.unwrap_or(Self::default_handler_count()); assert!(handler_count >= 1); @@ -166,6 +170,26 @@ where break; }; + let idle_inner_count = { + let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else { + break; + }; + + let mut inners: Vec<_> = mem::take(&mut *scheduler_inners); + let now = Instant::now(); + let old_inner_count = inners.len(); + // this loop should be fast because still the lock is held + inners.retain(|(_inner, pooled_at)| { + now.duration_since(*pooled_at) <= max_pooling_duration + }); + let new_inner_count = inners.len(); + scheduler_inners.extend(inners); + + old_inner_count + .checked_sub(new_inner_count) + .expect("new_inner_count isn't larger") + }; + let trashed_inner_count = { let Ok(mut trashed_scheduler_inners) = scheduler_pool.trashed_scheduler_inners.lock() @@ -181,9 +205,10 @@ where }; info!( - "Scheduler pool cleaner: dropped {} trashed inners", - trashed_inner_count, + "Scheduler pool cleaner: dropped {} idle inners, {} trashed inners", + idle_inner_count, trashed_inner_count, ); + sleepless_testing::at(CheckPoint::IdleSchedulerCleaned(idle_inner_count)); sleepless_testing::at(CheckPoint::TrashedSchedulerCleaned(trashed_inner_count)); } }; @@ -242,14 +267,15 @@ where self.scheduler_inners .lock() .expect("not poisoned") - .push(scheduler); + .push((scheduler, Instant::now())); } } fn do_take_scheduler(&self, context: SchedulingContext) -> S { // pop is intentional for filo, expecting relatively warmed-up scheduler due to having been // returned recently - if let Some(inner) = self.scheduler_inners.lock().expect("not poisoned").pop() { + if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop() + { S::from_inner(inner, context) } else { S::spawn(self.self_arc(), context) @@ -1289,6 +1315,7 @@ mod tests { BeforeNewTask, AfterTaskHandled, AfterSchedulerThreadAborted, + AfterIdleSchedulerCleaned, AfterTrashedSchedulerCleaned, BeforeThreadManagerDrop, } @@ -1326,6 +1353,37 @@ mod tests { const SHORTENED_POOL_CLEANER_INTERVAL: Duration = Duration::from_millis(1); + #[test] + fn test_scheduler_drop_idle() { + solana_logger::setup(); + + let _progress = sleepless_testing::setup(&[ + &CheckPoint::IdleSchedulerCleaned(1), + &TestCheckPoint::AfterIdleSchedulerCleaned, + ]); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool_raw = DefaultSchedulerPool::do_new( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + SHORTENED_POOL_CLEANER_INTERVAL, + Duration::from_millis(1), + ); + let pool = pool_raw.clone(); + let bank = Arc::new(Bank::default_for_tests()); + let context = SchedulingContext::new(bank); + + let scheduler = pool.do_take_scheduler(context); + Box::new(scheduler.into_inner().1).return_to_pool(); + + assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1); + sleepless_testing::at(&TestCheckPoint::AfterIdleSchedulerCleaned); + assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 0); + } + enum AbortCase { Unhandled, UnhandledWhilePanicking, @@ -1678,6 +1736,7 @@ mod tests { None, ignored_prioritization_fee_cache, SHORTENED_POOL_CLEANER_INTERVAL, + DEFAULT_MAX_POOLING_DURATION, ); let pool = pool_raw.clone(); let context = SchedulingContext::new(bank.clone());