Skip to content

Commit

Permalink
Clean idle pooled schedulers periodically (anza-xyz#1575)
Browse files Browse the repository at this point in the history
* Clean idle pooled schedulers periodically

* Fix race in failure_{with,without}_extra_tx

* Improve test

* Use extract_if polyfill

* Write justfication of extract_if with minor opt.
  • Loading branch information
ryoqun authored and samkim-crypto committed Jul 31, 2024
1 parent c017185 commit 51ad1c3
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 6 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ trees = "0.4.2"
tungstenite = "0.20.1"
uriparse = "0.6.4"
url = "2.5.0"
vec_extract_if_polyfill = "0.1.0"
wasm-bindgen = "0.2"
winapi = "0.3.8"
winreg = "0.50"
Expand Down
7 changes: 7 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions unified-scheduler-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ solana-program-runtime = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-unified-scheduler-logic = { workspace = true }
vec_extract_if_polyfill = { workspace = true }

[dev-dependencies]
assert_matches = { workspace = true }
Expand Down
113 changes: 107 additions & 6 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ use {
Arc, Mutex, OnceLock, Weak,
},
thread::{self, sleep, JoinHandle},
time::Duration,
time::{Duration, Instant},
},
vec_extract_if_polyfill::MakeExtractIf,
};

mod sleepless_testing;
Expand All @@ -63,6 +64,7 @@ enum CheckPoint {
NewTask(usize),
TaskHandled(usize),
SchedulerThreadAborted,
IdleSchedulerCleaned(usize),
TrashedSchedulerCleaned(usize),
}

Expand All @@ -73,7 +75,7 @@ type AtomicSchedulerId = AtomicU64;
// TransactionStatusSender; also, PohRecorder in the future)...
#[derive(Debug)]
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_inners: Mutex<Vec<S::Inner>>,
scheduler_inners: Mutex<Vec<(S::Inner, Instant)>>,
trashed_scheduler_inners: Mutex<Vec<S::Inner>>,
handler_count: usize,
handler_context: HandlerContext,
Expand Down Expand Up @@ -104,6 +106,7 @@ pub type DefaultSchedulerPool =
SchedulerPool<PooledScheduler<DefaultTaskHandler>, DefaultTaskHandler>;

const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10);
const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180);

impl<S, TH> SchedulerPool<S, TH>
where
Expand All @@ -127,6 +130,7 @@ where
replay_vote_sender,
prioritization_fee_cache,
DEFAULT_POOL_CLEANER_INTERVAL,
DEFAULT_MAX_POOLING_DURATION,
)
}

Expand All @@ -137,6 +141,7 @@ where
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
pool_cleaner_interval: Duration,
max_pooling_duration: Duration,
) -> Arc<Self> {
let handler_count = handler_count.unwrap_or(Self::default_handler_count());
assert!(handler_count >= 1);
Expand Down Expand Up @@ -166,6 +171,33 @@ where
break;
};

let idle_inner_count = {
let now = Instant::now();

// Pre-allocate rather large capacity to avoid reallocation inside the lock.
let mut idle_inners = Vec::with_capacity(128);

let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else {
break;
};
// Use the still-unstable Vec::extract_if() even on stable rust toolchain by
// using a polyfill and allowing unstable_name_collisions, because it's
// simplest to code and fastest to run (= O(n); single linear pass and no
// reallocation).
//
// Note that this critical section could block the latency-sensitive replay
// code-path via ::take_scheduler().
#[allow(unstable_name_collisions)]
idle_inners.extend(scheduler_inners.extract_if(|(_inner, pooled_at)| {
now.duration_since(*pooled_at) > max_pooling_duration
}));
drop(scheduler_inners);

let idle_inner_count = idle_inners.len();
drop(idle_inners);
idle_inner_count
};

let trashed_inner_count = {
let Ok(mut trashed_scheduler_inners) =
scheduler_pool.trashed_scheduler_inners.lock()
Expand All @@ -181,9 +213,10 @@ where
};

info!(
"Scheduler pool cleaner: dropped {} trashed inners",
trashed_inner_count,
"Scheduler pool cleaner: dropped {} idle inners, {} trashed inners",
idle_inner_count, trashed_inner_count,
);
sleepless_testing::at(CheckPoint::IdleSchedulerCleaned(idle_inner_count));
sleepless_testing::at(CheckPoint::TrashedSchedulerCleaned(trashed_inner_count));
}
};
Expand Down Expand Up @@ -242,14 +275,15 @@ where
self.scheduler_inners
.lock()
.expect("not poisoned")
.push(scheduler);
.push((scheduler, Instant::now()));
}
}

fn do_take_scheduler(&self, context: SchedulingContext) -> S {
// pop is intentional for filo, expecting relatively warmed-up scheduler due to having been
// returned recently
if let Some(inner) = self.scheduler_inners.lock().expect("not poisoned").pop() {
if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop()
{
S::from_inner(inner, context)
} else {
S::spawn(self.self_arc(), context)
Expand Down Expand Up @@ -1289,6 +1323,9 @@ mod tests {
BeforeNewTask,
AfterTaskHandled,
AfterSchedulerThreadAborted,
BeforeIdleSchedulerCleaned,
AfterIdleSchedulerCleaned,
BeforeTrashedSchedulerCleaned,
AfterTrashedSchedulerCleaned,
BeforeThreadManagerDrop,
}
Expand Down Expand Up @@ -1326,6 +1363,62 @@ mod tests {

const SHORTENED_POOL_CLEANER_INTERVAL: Duration = Duration::from_millis(1);

#[test]
fn test_scheduler_drop_idle() {
solana_logger::setup();

let _progress = sleepless_testing::setup(&[
&TestCheckPoint::BeforeIdleSchedulerCleaned,
&CheckPoint::IdleSchedulerCleaned(0),
&CheckPoint::IdleSchedulerCleaned(1),
&TestCheckPoint::AfterIdleSchedulerCleaned,
]);

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let shortened_max_pooling_duration = Duration::from_millis(10);
let pool_raw = DefaultSchedulerPool::do_new(
None,
None,
None,
None,
ignored_prioritization_fee_cache,
SHORTENED_POOL_CLEANER_INTERVAL,
shortened_max_pooling_duration,
);
let pool = pool_raw.clone();
let bank = Arc::new(Bank::default_for_tests());
let context1 = SchedulingContext::new(bank);
let context2 = context1.clone();

let old_scheduler = pool.do_take_scheduler(context1);
let new_scheduler = pool.do_take_scheduler(context2);
let new_scheduler_id = new_scheduler.id();
Box::new(old_scheduler.into_inner().1).return_to_pool();

// sleepless_testing can't be used; wait a bit here to see real progress of wall time...
sleep(shortened_max_pooling_duration * 10);
Box::new(new_scheduler.into_inner().1).return_to_pool();

// Block solScCleaner until we see returned schedlers...
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 2);
sleepless_testing::at(TestCheckPoint::BeforeIdleSchedulerCleaned);

// See the old (= idle) scheduler gone only after solScCleaner did its job...
sleepless_testing::at(&TestCheckPoint::AfterIdleSchedulerCleaned);
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1);
assert_eq!(
pool_raw
.scheduler_inners
.lock()
.unwrap()
.first()
.as_ref()
.map(|(inner, _pooled_at)| inner.id())
.unwrap(),
new_scheduler_id
);
}

enum AbortCase {
Unhandled,
UnhandledWhilePanicking,
Expand Down Expand Up @@ -1658,6 +1751,8 @@ mod tests {
&TestCheckPoint::AfterTaskHandled,
&CheckPoint::SchedulerThreadAborted,
&TestCheckPoint::AfterSchedulerThreadAborted,
&TestCheckPoint::BeforeTrashedSchedulerCleaned,
&CheckPoint::TrashedSchedulerCleaned(0),
&CheckPoint::TrashedSchedulerCleaned(1),
&TestCheckPoint::AfterTrashedSchedulerCleaned,
]);
Expand All @@ -1678,6 +1773,7 @@ mod tests {
None,
ignored_prioritization_fee_cache,
SHORTENED_POOL_CLEANER_INTERVAL,
DEFAULT_MAX_POOLING_DURATION,
);
let pool = pool_raw.clone();
let context = SchedulingContext::new(bank.clone());
Expand Down Expand Up @@ -1729,7 +1825,12 @@ mod tests {
bank.wait_for_completed_scheduler(),
Some((Err(TransactionError::AccountNotFound), _timings))
);

// Block solScCleaner until we see trashed schedler...
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 1);
sleepless_testing::at(TestCheckPoint::BeforeTrashedSchedulerCleaned);

// See the trashed scheduler gone only after solScCleaner did its job...
sleepless_testing::at(TestCheckPoint::AfterTrashedSchedulerCleaned);
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0);
}
Expand Down

0 comments on commit 51ad1c3

Please sign in to comment.