Skip to content

Commit

Permalink
Clean idle pooled schedulers periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Jun 2, 2024
1 parent 74dcd3b commit b16e9fe
Showing 1 changed file with 65 additions and 6 deletions.
71 changes: 65 additions & 6 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use {
Arc, Mutex, OnceLock, Weak,
},
thread::{self, sleep, JoinHandle},
time::Duration,
time::{Duration, Instant},
},
};

Expand All @@ -63,6 +63,7 @@ enum CheckPoint {
NewTask(usize),
TaskHandled(usize),
SchedulerThreadAborted,
IdleSchedulerCleaned(usize),
TrashedSchedulerCleaned(usize),
}

Expand All @@ -73,7 +74,7 @@ type AtomicSchedulerId = AtomicU64;
// TransactionStatusSender; also, PohRecorder in the future)...
#[derive(Debug)]
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_inners: Mutex<Vec<S::Inner>>,
scheduler_inners: Mutex<Vec<(S::Inner, Instant)>>,
trashed_scheduler_inners: Mutex<Vec<S::Inner>>,
handler_count: usize,
handler_context: HandlerContext,
Expand Down Expand Up @@ -104,6 +105,7 @@ pub type DefaultSchedulerPool =
SchedulerPool<PooledScheduler<DefaultTaskHandler>, DefaultTaskHandler>;

const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10);
const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180);

impl<S, TH> SchedulerPool<S, TH>
where
Expand All @@ -127,6 +129,7 @@ where
replay_vote_sender,
prioritization_fee_cache,
DEFAULT_POOL_CLEANER_INTERVAL,
DEFAULT_MAX_POOLING_DURATION,
)
}

Expand All @@ -137,6 +140,7 @@ where
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
pool_cleaner_interval: Duration,
max_pooling_duration: Duration,
) -> Arc<Self> {
let handler_count = handler_count.unwrap_or(Self::default_handler_count());
assert!(handler_count >= 1);
Expand Down Expand Up @@ -166,6 +170,26 @@ where
break;
};

let idle_inner_count = {
let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else {
break;
};

let mut inners = mem::take(&mut *scheduler_inners);
let now = Instant::now();
let old_inner_count = inners.len();
// this loop should be fast because still the lock is held
inners.retain(|(_inner, pooled_at)| {
now.duration_since(*pooled_at) <= max_pooling_duration
});
let new_inner_count = inners.len();
scheduler_inners.extend(inners);

old_inner_count
.checked_sub(new_inner_count)
.expect("new_inner_count isn't larger")
};

let trashed_inner_count = {
let Ok(mut trashed_scheduler_inners) =
scheduler_pool.trashed_scheduler_inners.lock()
Expand All @@ -181,9 +205,10 @@ where
};

info!(
"Scheduler pool cleaner: dropped {} trashed inners",
trashed_inner_count,
"Scheduler pool cleaner: dropped {} idle inners, {} trashed inners",
idle_inner_count, trashed_inner_count,
);
sleepless_testing::at(CheckPoint::IdleSchedulerCleaned(idle_inner_count));
sleepless_testing::at(CheckPoint::TrashedSchedulerCleaned(trashed_inner_count));
}
};
Expand Down Expand Up @@ -242,14 +267,15 @@ where
self.scheduler_inners
.lock()
.expect("not poisoned")
.push(scheduler);
.push((scheduler, Instant::now()));
}
}

fn do_take_scheduler(&self, context: SchedulingContext) -> S {
// pop is intentional for filo, expecting relatively warmed-up scheduler due to having been
// returned recently
if let Some(inner) = self.scheduler_inners.lock().expect("not poisoned").pop() {
if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop()
{
S::from_inner(inner, context)
} else {
S::spawn(self.self_arc(), context)
Expand Down Expand Up @@ -1289,6 +1315,7 @@ mod tests {
BeforeNewTask,
AfterTaskHandled,
AfterSchedulerThreadAborted,
AfterIdleSchedulerCleaned,
AfterTrashedSchedulerCleaned,
BeforeThreadManagerDrop,
}
Expand Down Expand Up @@ -1326,6 +1353,37 @@ mod tests {

const SHORTENED_POOL_CLEANER_INTERVAL: Duration = Duration::from_millis(1);

#[test]
fn test_scheduler_drop_idle() {
solana_logger::setup();

let _progress = sleepless_testing::setup(&[
&CheckPoint::IdleSchedulerCleaned(1),
&TestCheckPoint::AfterIdleSchedulerCleaned,
]);

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool_raw = DefaultSchedulerPool::do_new(
None,
None,
None,
None,
ignored_prioritization_fee_cache,
SHORTENED_POOL_CLEANER_INTERVAL,
Duration::from_millis(1),
);
let pool = pool_raw.clone();
let bank = Arc::new(Bank::default_for_tests());
let context = SchedulingContext::new(bank);

let scheduler = pool.do_take_scheduler(context);
Box::new(scheduler.into_inner().1).return_to_pool();

assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1);
sleepless_testing::at(&TestCheckPoint::AfterIdleSchedulerCleaned);
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 0);
}

enum AbortCase {
Unhandled,
UnhandledWhilePanicking,
Expand Down Expand Up @@ -1678,6 +1736,7 @@ mod tests {
None,
ignored_prioritization_fee_cache,
SHORTENED_POOL_CLEANER_INTERVAL,
DEFAULT_MAX_POOLING_DURATION,
);
let pool = pool_raw.clone();
let context = SchedulingContext::new(bank.clone());
Expand Down

0 comments on commit b16e9fe

Please sign in to comment.