From a054e479537db133422296a54b0bb6d91bc48fb6 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 10 Jun 2024 16:05:10 +0900 Subject: [PATCH] Clean idle pooled schedulers periodically (#1575) * Clean idle pooled schedulers periodically * Fix race in failure_{with,without}_extra_tx * Improve test * Use extract_if polyfill * Write justfication of extract_if with minor opt. --- Cargo.lock | 7 ++ Cargo.toml | 1 + programs/sbf/Cargo.lock | 7 ++ unified-scheduler-pool/Cargo.toml | 1 + unified-scheduler-pool/src/lib.rs | 113 ++++++++++++++++++++++++++++-- 5 files changed, 123 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67bc056297e1bf..c26ef2d1f71bcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7614,6 +7614,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "solana-unified-scheduler-logic", + "vec_extract_if_polyfill", ] [[package]] @@ -8984,6 +8985,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec_extract_if_polyfill" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c9cb5fb67c2692310b6eb3fce7dd4b6e4c9a75be4f2f46b27f0b2b7799759c" + [[package]] name = "vec_map" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 9f749422730117..c131bbc972335a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -438,6 +438,7 @@ trees = "0.4.2" tungstenite = "0.20.1" uriparse = "0.6.4" url = "2.5.0" +vec_extract_if_polyfill = "0.1.0" wasm-bindgen = "0.2" winapi = "0.3.8" winreg = "0.50" diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 7f7f33a445ac2b..9e2178df2d13ca 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6500,6 +6500,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "solana-unified-scheduler-logic", + "vec_extract_if_polyfill", ] [[package]] @@ -7698,6 +7699,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec_extract_if_polyfill" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c9cb5fb67c2692310b6eb3fce7dd4b6e4c9a75be4f2f46b27f0b2b7799759c" + [[package]] name = "vec_map" version = "0.8.2" diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 69bf63e2be3806..6a46a32b271eac 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -21,6 +21,7 @@ solana-program-runtime = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } solana-unified-scheduler-logic = { workspace = true } +vec_extract_if_polyfill = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 029b7800b821ec..645a69e84f70bc 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -49,8 +49,9 @@ use { Arc, Mutex, OnceLock, Weak, }, thread::{self, sleep, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }, + vec_extract_if_polyfill::MakeExtractIf, }; mod sleepless_testing; @@ -63,6 +64,7 @@ enum CheckPoint { NewTask(usize), TaskHandled(usize), SchedulerThreadAborted, + IdleSchedulerCleaned(usize), TrashedSchedulerCleaned(usize), } @@ -73,7 +75,7 @@ type AtomicSchedulerId = AtomicU64; // TransactionStatusSender; also, PohRecorder in the future)... #[derive(Debug)] pub struct SchedulerPool, TH: TaskHandler> { - scheduler_inners: Mutex>, + scheduler_inners: Mutex>, trashed_scheduler_inners: Mutex>, handler_count: usize, handler_context: HandlerContext, @@ -104,6 +106,7 @@ pub type DefaultSchedulerPool = SchedulerPool, DefaultTaskHandler>; const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10); +const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180); impl SchedulerPool where @@ -127,6 +130,7 @@ where replay_vote_sender, prioritization_fee_cache, DEFAULT_POOL_CLEANER_INTERVAL, + DEFAULT_MAX_POOLING_DURATION, ) } @@ -137,6 +141,7 @@ where replay_vote_sender: Option, prioritization_fee_cache: Arc, pool_cleaner_interval: Duration, + max_pooling_duration: Duration, ) -> Arc { let handler_count = handler_count.unwrap_or(Self::default_handler_count()); assert!(handler_count >= 1); @@ -166,6 +171,33 @@ where break; }; + let idle_inner_count = { + let now = Instant::now(); + + // Pre-allocate rather large capacity to avoid reallocation inside the lock. + let mut idle_inners = Vec::with_capacity(128); + + let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else { + break; + }; + // Use the still-unstable Vec::extract_if() even on stable rust toolchain by + // using a polyfill and allowing unstable_name_collisions, because it's + // simplest to code and fastest to run (= O(n); single linear pass and no + // reallocation). + // + // Note that this critical section could block the latency-sensitive replay + // code-path via ::take_scheduler(). + #[allow(unstable_name_collisions)] + idle_inners.extend(scheduler_inners.extract_if(|(_inner, pooled_at)| { + now.duration_since(*pooled_at) > max_pooling_duration + })); + drop(scheduler_inners); + + let idle_inner_count = idle_inners.len(); + drop(idle_inners); + idle_inner_count + }; + let trashed_inner_count = { let Ok(mut trashed_scheduler_inners) = scheduler_pool.trashed_scheduler_inners.lock() @@ -181,9 +213,10 @@ where }; info!( - "Scheduler pool cleaner: dropped {} trashed inners", - trashed_inner_count, + "Scheduler pool cleaner: dropped {} idle inners, {} trashed inners", + idle_inner_count, trashed_inner_count, ); + sleepless_testing::at(CheckPoint::IdleSchedulerCleaned(idle_inner_count)); sleepless_testing::at(CheckPoint::TrashedSchedulerCleaned(trashed_inner_count)); } }; @@ -242,14 +275,15 @@ where self.scheduler_inners .lock() .expect("not poisoned") - .push(scheduler); + .push((scheduler, Instant::now())); } } fn do_take_scheduler(&self, context: SchedulingContext) -> S { // pop is intentional for filo, expecting relatively warmed-up scheduler due to having been // returned recently - if let Some(inner) = self.scheduler_inners.lock().expect("not poisoned").pop() { + if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop() + { S::from_inner(inner, context) } else { S::spawn(self.self_arc(), context) @@ -1289,6 +1323,9 @@ mod tests { BeforeNewTask, AfterTaskHandled, AfterSchedulerThreadAborted, + BeforeIdleSchedulerCleaned, + AfterIdleSchedulerCleaned, + BeforeTrashedSchedulerCleaned, AfterTrashedSchedulerCleaned, BeforeThreadManagerDrop, } @@ -1326,6 +1363,62 @@ mod tests { const SHORTENED_POOL_CLEANER_INTERVAL: Duration = Duration::from_millis(1); + #[test] + fn test_scheduler_drop_idle() { + solana_logger::setup(); + + let _progress = sleepless_testing::setup(&[ + &TestCheckPoint::BeforeIdleSchedulerCleaned, + &CheckPoint::IdleSchedulerCleaned(0), + &CheckPoint::IdleSchedulerCleaned(1), + &TestCheckPoint::AfterIdleSchedulerCleaned, + ]); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let shortened_max_pooling_duration = Duration::from_millis(10); + let pool_raw = DefaultSchedulerPool::do_new( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + SHORTENED_POOL_CLEANER_INTERVAL, + shortened_max_pooling_duration, + ); + let pool = pool_raw.clone(); + let bank = Arc::new(Bank::default_for_tests()); + let context1 = SchedulingContext::new(bank); + let context2 = context1.clone(); + + let old_scheduler = pool.do_take_scheduler(context1); + let new_scheduler = pool.do_take_scheduler(context2); + let new_scheduler_id = new_scheduler.id(); + Box::new(old_scheduler.into_inner().1).return_to_pool(); + + // sleepless_testing can't be used; wait a bit here to see real progress of wall time... + sleep(shortened_max_pooling_duration * 10); + Box::new(new_scheduler.into_inner().1).return_to_pool(); + + // Block solScCleaner until we see returned schedlers... + assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 2); + sleepless_testing::at(TestCheckPoint::BeforeIdleSchedulerCleaned); + + // See the old (= idle) scheduler gone only after solScCleaner did its job... + sleepless_testing::at(&TestCheckPoint::AfterIdleSchedulerCleaned); + assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1); + assert_eq!( + pool_raw + .scheduler_inners + .lock() + .unwrap() + .first() + .as_ref() + .map(|(inner, _pooled_at)| inner.id()) + .unwrap(), + new_scheduler_id + ); + } + enum AbortCase { Unhandled, UnhandledWhilePanicking, @@ -1658,6 +1751,8 @@ mod tests { &TestCheckPoint::AfterTaskHandled, &CheckPoint::SchedulerThreadAborted, &TestCheckPoint::AfterSchedulerThreadAborted, + &TestCheckPoint::BeforeTrashedSchedulerCleaned, + &CheckPoint::TrashedSchedulerCleaned(0), &CheckPoint::TrashedSchedulerCleaned(1), &TestCheckPoint::AfterTrashedSchedulerCleaned, ]); @@ -1678,6 +1773,7 @@ mod tests { None, ignored_prioritization_fee_cache, SHORTENED_POOL_CLEANER_INTERVAL, + DEFAULT_MAX_POOLING_DURATION, ); let pool = pool_raw.clone(); let context = SchedulingContext::new(bank.clone()); @@ -1729,7 +1825,12 @@ mod tests { bank.wait_for_completed_scheduler(), Some((Err(TransactionError::AccountNotFound), _timings)) ); + + // Block solScCleaner until we see trashed schedler... assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 1); + sleepless_testing::at(TestCheckPoint::BeforeTrashedSchedulerCleaned); + + // See the trashed scheduler gone only after solScCleaner did its job... sleepless_testing::at(TestCheckPoint::AfterTrashedSchedulerCleaned); assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0); }