From 42eaadbf0281ff77a813bac48067b4d536b3a89a Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sun, 2 Jun 2024 14:21:45 +0900 Subject: [PATCH 1/5] Clean idle pooled schedulers periodically --- unified-scheduler-pool/src/lib.rs | 84 ++++++++++++++++++++++++++++--- 1 file changed, 78 insertions(+), 6 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 029b7800b821ec..5a35ee4c751c29 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -49,7 +49,7 @@ use { Arc, Mutex, OnceLock, Weak, }, thread::{self, sleep, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }, }; @@ -63,6 +63,7 @@ enum CheckPoint { NewTask(usize), TaskHandled(usize), SchedulerThreadAborted, + IdleSchedulerCleaned(usize), TrashedSchedulerCleaned(usize), } @@ -73,7 +74,7 @@ type AtomicSchedulerId = AtomicU64; // TransactionStatusSender; also, PohRecorder in the future)... #[derive(Debug)] pub struct SchedulerPool, TH: TaskHandler> { - scheduler_inners: Mutex>, + scheduler_inners: Mutex>, trashed_scheduler_inners: Mutex>, handler_count: usize, handler_context: HandlerContext, @@ -104,6 +105,7 @@ pub type DefaultSchedulerPool = SchedulerPool, DefaultTaskHandler>; const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10); +const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180); impl SchedulerPool where @@ -127,6 +129,7 @@ where replay_vote_sender, prioritization_fee_cache, DEFAULT_POOL_CLEANER_INTERVAL, + DEFAULT_MAX_POOLING_DURATION, ) } @@ -137,6 +140,7 @@ where replay_vote_sender: Option, prioritization_fee_cache: Arc, pool_cleaner_interval: Duration, + max_pooling_duration: Duration, ) -> Arc { let handler_count = handler_count.unwrap_or(Self::default_handler_count()); assert!(handler_count >= 1); @@ -166,6 +170,32 @@ where break; }; + let idle_inner_count = { + let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else { + break; + }; + let mut inners = mem::take(&mut *scheduler_inners); + drop(scheduler_inners); + + let now = Instant::now(); + let old_inner_count = inners.len(); + // retain could take long time because it's dropping schedulers! + inners.retain(|(_inner, pooled_at)| { + now.duration_since(*pooled_at) <= max_pooling_duration + }); + let new_inner_count = inners.len(); + + let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else { + break; + }; + scheduler_inners.extend(inners); + drop(scheduler_inners); + + old_inner_count + .checked_sub(new_inner_count) + .expect("new_inner_count isn't larger") + }; + let trashed_inner_count = { let Ok(mut trashed_scheduler_inners) = scheduler_pool.trashed_scheduler_inners.lock() @@ -181,9 +211,10 @@ where }; info!( - "Scheduler pool cleaner: dropped {} trashed inners", - trashed_inner_count, + "Scheduler pool cleaner: dropped {} idle inners, {} trashed inners", + idle_inner_count, trashed_inner_count, ); + sleepless_testing::at(CheckPoint::IdleSchedulerCleaned(idle_inner_count)); sleepless_testing::at(CheckPoint::TrashedSchedulerCleaned(trashed_inner_count)); } }; @@ -242,14 +273,15 @@ where self.scheduler_inners .lock() .expect("not poisoned") - .push(scheduler); + .push((scheduler, Instant::now())); } } fn do_take_scheduler(&self, context: SchedulingContext) -> S { // pop is intentional for filo, expecting relatively warmed-up scheduler due to having been // returned recently - if let Some(inner) = self.scheduler_inners.lock().expect("not poisoned").pop() { + if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop() + { S::from_inner(inner, context) } else { S::spawn(self.self_arc(), context) @@ -1289,6 +1321,8 @@ mod tests { BeforeNewTask, AfterTaskHandled, AfterSchedulerThreadAborted, + BeforeIdleSchedulerCleaned, + AfterIdleSchedulerCleaned, AfterTrashedSchedulerCleaned, BeforeThreadManagerDrop, } @@ -1326,6 +1360,43 @@ mod tests { const SHORTENED_POOL_CLEANER_INTERVAL: Duration = Duration::from_millis(1); + #[test] + fn test_scheduler_drop_idle() { + solana_logger::setup(); + + let _progress = sleepless_testing::setup(&[ + &TestCheckPoint::BeforeIdleSchedulerCleaned, + &CheckPoint::IdleSchedulerCleaned(0), + &CheckPoint::IdleSchedulerCleaned(1), + &TestCheckPoint::AfterIdleSchedulerCleaned, + ]); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool_raw = DefaultSchedulerPool::do_new( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + SHORTENED_POOL_CLEANER_INTERVAL, + Duration::from_millis(1), + ); + let pool = pool_raw.clone(); + let bank = Arc::new(Bank::default_for_tests()); + let context = SchedulingContext::new(bank); + + let scheduler = pool.do_take_scheduler(context); + Box::new(scheduler.into_inner().1).return_to_pool(); + + // Block solScCleaner until we see returned schedler... + assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1); + sleepless_testing::at(TestCheckPoint::BeforeIdleSchedulerCleaned); + + // See the idle scheduler gone only after solScCleaner did its job... + sleepless_testing::at(&TestCheckPoint::AfterIdleSchedulerCleaned); + assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 0); + } + enum AbortCase { Unhandled, UnhandledWhilePanicking, @@ -1678,6 +1749,7 @@ mod tests { None, ignored_prioritization_fee_cache, SHORTENED_POOL_CLEANER_INTERVAL, + DEFAULT_MAX_POOLING_DURATION, ); let pool = pool_raw.clone(); let context = SchedulingContext::new(bank.clone()); From f4f7192eca07ffcce49755ffb1fcf9f0742e18b8 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sun, 2 Jun 2024 21:46:43 +0900 Subject: [PATCH 2/5] Fix race in failure_{with,without}_extra_tx --- unified-scheduler-pool/src/lib.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 5a35ee4c751c29..341b0685276e9e 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -1323,6 +1323,7 @@ mod tests { AfterSchedulerThreadAborted, BeforeIdleSchedulerCleaned, AfterIdleSchedulerCleaned, + BeforeTrashedSchedulerCleaned, AfterTrashedSchedulerCleaned, BeforeThreadManagerDrop, } @@ -1729,6 +1730,8 @@ mod tests { &TestCheckPoint::AfterTaskHandled, &CheckPoint::SchedulerThreadAborted, &TestCheckPoint::AfterSchedulerThreadAborted, + &TestCheckPoint::BeforeTrashedSchedulerCleaned, + &CheckPoint::TrashedSchedulerCleaned(0), &CheckPoint::TrashedSchedulerCleaned(1), &TestCheckPoint::AfterTrashedSchedulerCleaned, ]); @@ -1801,7 +1804,12 @@ mod tests { bank.wait_for_completed_scheduler(), Some((Err(TransactionError::AccountNotFound), _timings)) ); + + // Block solScCleaner until we see trashed schedler... assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 1); + sleepless_testing::at(TestCheckPoint::BeforeTrashedSchedulerCleaned); + + // See the trashed scheduler gone only after solScCleaner did its job... sleepless_testing::at(TestCheckPoint::AfterTrashedSchedulerCleaned); assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0); } From c3f99595966868eb56d6c02292ff6de26123c39f Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 8 Jun 2024 23:18:30 +0900 Subject: [PATCH 3/5] Improve test --- unified-scheduler-pool/src/lib.rs | 35 ++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 341b0685276e9e..4dfacba66e3ca4 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -1373,6 +1373,7 @@ mod tests { ]); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let shortened_max_pooling_duration = Duration::from_millis(10); let pool_raw = DefaultSchedulerPool::do_new( None, None, @@ -1380,22 +1381,40 @@ mod tests { None, ignored_prioritization_fee_cache, SHORTENED_POOL_CLEANER_INTERVAL, - Duration::from_millis(1), + shortened_max_pooling_duration, ); let pool = pool_raw.clone(); let bank = Arc::new(Bank::default_for_tests()); - let context = SchedulingContext::new(bank); + let context1 = SchedulingContext::new(bank); + let context2 = context1.clone(); - let scheduler = pool.do_take_scheduler(context); - Box::new(scheduler.into_inner().1).return_to_pool(); + let old_scheduler = pool.do_take_scheduler(context1); + let new_scheduler = pool.do_take_scheduler(context2); + let new_scheduler_id = new_scheduler.id(); + Box::new(old_scheduler.into_inner().1).return_to_pool(); - // Block solScCleaner until we see returned schedler... - assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1); + // sleepless_testing can't be used; wait a bit here to see real progress of wall time... + sleep(shortened_max_pooling_duration * 10); + Box::new(new_scheduler.into_inner().1).return_to_pool(); + + // Block solScCleaner until we see returned schedlers... + assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 2); sleepless_testing::at(TestCheckPoint::BeforeIdleSchedulerCleaned); - // See the idle scheduler gone only after solScCleaner did its job... + // See the old (= idle) scheduler gone only after solScCleaner did its job... sleepless_testing::at(&TestCheckPoint::AfterIdleSchedulerCleaned); - assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 0); + assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1); + assert_eq!( + pool_raw + .scheduler_inners + .lock() + .unwrap() + .first() + .as_ref() + .map(|(inner, _pooled_at)| inner.id()) + .unwrap(), + new_scheduler_id + ); } enum AbortCase { From 645f8bcca99e7cc12850fbf1e89585ffd7780cfd Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 8 Jun 2024 22:52:20 +0900 Subject: [PATCH 4/5] Use extract_if polyfill --- Cargo.lock | 7 +++++++ Cargo.toml | 1 + programs/sbf/Cargo.lock | 7 +++++++ unified-scheduler-pool/Cargo.toml | 1 + unified-scheduler-pool/src/lib.rs | 28 +++++++++++----------------- 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67bc056297e1bf..c26ef2d1f71bcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7614,6 +7614,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "solana-unified-scheduler-logic", + "vec_extract_if_polyfill", ] [[package]] @@ -8984,6 +8985,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec_extract_if_polyfill" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c9cb5fb67c2692310b6eb3fce7dd4b6e4c9a75be4f2f46b27f0b2b7799759c" + [[package]] name = "vec_map" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 9f749422730117..c131bbc972335a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -438,6 +438,7 @@ trees = "0.4.2" tungstenite = "0.20.1" uriparse = "0.6.4" url = "2.5.0" +vec_extract_if_polyfill = "0.1.0" wasm-bindgen = "0.2" winapi = "0.3.8" winreg = "0.50" diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 7f7f33a445ac2b..9e2178df2d13ca 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6500,6 +6500,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "solana-unified-scheduler-logic", + "vec_extract_if_polyfill", ] [[package]] @@ -7698,6 +7699,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec_extract_if_polyfill" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c9cb5fb67c2692310b6eb3fce7dd4b6e4c9a75be4f2f46b27f0b2b7799759c" + [[package]] name = "vec_map" version = "0.8.2" diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 69bf63e2be3806..6a46a32b271eac 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -21,6 +21,7 @@ solana-program-runtime = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } solana-unified-scheduler-logic = { workspace = true } +vec_extract_if_polyfill = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 4dfacba66e3ca4..de4699c005a60a 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -51,6 +51,7 @@ use { thread::{self, sleep, JoinHandle}, time::{Duration, Instant}, }, + vec_extract_if_polyfill::MakeExtractIf, }; mod sleepless_testing; @@ -171,29 +172,22 @@ where }; let idle_inner_count = { - let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else { - break; - }; - let mut inners = mem::take(&mut *scheduler_inners); - drop(scheduler_inners); - let now = Instant::now(); - let old_inner_count = inners.len(); - // retain could take long time because it's dropping schedulers! - inners.retain(|(_inner, pooled_at)| { - now.duration_since(*pooled_at) <= max_pooling_duration - }); - let new_inner_count = inners.len(); - let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else { break; }; - scheduler_inners.extend(inners); + // I want to use the fancy ::extract_if() no matter what.... + #[allow(unstable_name_collisions)] + let idle_inners = scheduler_inners + .extract_if(|(_inner, pooled_at)| { + now.duration_since(*pooled_at) > max_pooling_duration + }) + .collect::>(); drop(scheduler_inners); - old_inner_count - .checked_sub(new_inner_count) - .expect("new_inner_count isn't larger") + let idle_inner_count = idle_inners.len(); + drop(idle_inners); + idle_inner_count }; let trashed_inner_count = { From 2b90fd1587ef87ba21154f8b4e5a14844962babd Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 10 Jun 2024 14:41:37 +0900 Subject: [PATCH 5/5] Write justfication of extract_if with minor opt. --- unified-scheduler-pool/src/lib.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index de4699c005a60a..645a69e84f70bc 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -173,16 +173,24 @@ where let idle_inner_count = { let now = Instant::now(); + + // Pre-allocate rather large capacity to avoid reallocation inside the lock. + let mut idle_inners = Vec::with_capacity(128); + let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else { break; }; - // I want to use the fancy ::extract_if() no matter what.... + // Use the still-unstable Vec::extract_if() even on stable rust toolchain by + // using a polyfill and allowing unstable_name_collisions, because it's + // simplest to code and fastest to run (= O(n); single linear pass and no + // reallocation). + // + // Note that this critical section could block the latency-sensitive replay + // code-path via ::take_scheduler(). #[allow(unstable_name_collisions)] - let idle_inners = scheduler_inners - .extract_if(|(_inner, pooled_at)| { - now.duration_since(*pooled_at) > max_pooling_duration - }) - .collect::>(); + idle_inners.extend(scheduler_inners.extract_if(|(_inner, pooled_at)| { + now.duration_since(*pooled_at) > max_pooling_duration + })); drop(scheduler_inners); let idle_inner_count = idle_inners.len();