Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean idle pooled schedulers periodically #1575

Merged
merged 5 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ trees = "0.4.2"
tungstenite = "0.20.1"
uriparse = "0.6.4"
url = "2.5.0"
vec_extract_if_polyfill = "0.1.0"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to this page, we're the first consumer of the polyfill crate: https://crates.io/crates/vec_extract_if_polyfill/reverse_dependencies

however, i've confirmed that this crate is almost verbatim copy of what's gated behind the unstable feature flag in our present rust-nightly std source:

$ git diff -w --no-index ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/vec_extract_if_polyfill-0.1.0/src/lib.rs ~/.rustup/toolchains/nightly-2024-05-02-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/vec/extract_if.rs
...
$ git diff -w --no-index ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/vec_extract_if_polyfill-0.1.0/src/lib.rs ~/.rustup/toolchains/nightly-2024-05-02-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/vec/mod.rs
...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we're adding the dependency instead of just using a nightly feature if we really need it? The docs even say this polyfill crate uses it:

This struct is created by Vec::extract_if. See its documentation for more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we only supported building with nightly, it would work.

but, Vec::extract_if() isn't available for stable rust yet...

wasm-bindgen = "0.2"
winapi = "0.3.8"
winreg = "0.50"
Expand Down
7 changes: 7 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions unified-scheduler-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ solana-program-runtime = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-unified-scheduler-logic = { workspace = true }
vec_extract_if_polyfill = { workspace = true }

[dev-dependencies]
assert_matches = { workspace = true }
Expand Down
113 changes: 107 additions & 6 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ use {
Arc, Mutex, OnceLock, Weak,
},
thread::{self, sleep, JoinHandle},
time::Duration,
time::{Duration, Instant},
},
vec_extract_if_polyfill::MakeExtractIf,
};

mod sleepless_testing;
Expand All @@ -63,6 +64,7 @@ enum CheckPoint {
NewTask(usize),
TaskHandled(usize),
SchedulerThreadAborted,
IdleSchedulerCleaned(usize),
TrashedSchedulerCleaned(usize),
}

Expand All @@ -73,7 +75,7 @@ type AtomicSchedulerId = AtomicU64;
// TransactionStatusSender; also, PohRecorder in the future)...
#[derive(Debug)]
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_inners: Mutex<Vec<S::Inner>>,
scheduler_inners: Mutex<Vec<(S::Inner, Instant)>>,
trashed_scheduler_inners: Mutex<Vec<S::Inner>>,
handler_count: usize,
handler_context: HandlerContext,
Expand Down Expand Up @@ -104,6 +106,7 @@ pub type DefaultSchedulerPool =
SchedulerPool<PooledScheduler<DefaultTaskHandler>, DefaultTaskHandler>;

const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10);
const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180);

impl<S, TH> SchedulerPool<S, TH>
where
Expand All @@ -127,6 +130,7 @@ where
replay_vote_sender,
prioritization_fee_cache,
DEFAULT_POOL_CLEANER_INTERVAL,
DEFAULT_MAX_POOLING_DURATION,
)
}

Expand All @@ -137,6 +141,7 @@ where
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
pool_cleaner_interval: Duration,
max_pooling_duration: Duration,
) -> Arc<Self> {
let handler_count = handler_count.unwrap_or(Self::default_handler_count());
assert!(handler_count >= 1);
Expand Down Expand Up @@ -166,6 +171,33 @@ where
break;
};

let idle_inner_count = {
let now = Instant::now();

// Pre-allocate rather large capacity to avoid reallocation inside the lock.
let mut idle_inners = Vec::with_capacity(128);

let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else {
break;
};
// Use the still-unstable Vec::extract_if() even on stable rust toolchain by
// using a polyfill and allowing unstable_name_collisions, because it's
// simplest to code and fastest to run (= O(n); single linear pass and no
// reallocation).
//
// Note that this critical section could block the latency-sensitive replay
// code-path via ::take_scheduler().
#[allow(unstable_name_collisions)]
idle_inners.extend(scheduler_inners.extract_if(|(_inner, pooled_at)| {
now.duration_since(*pooled_at) > max_pooling_duration
}));
drop(scheduler_inners);

let idle_inner_count = idle_inners.len();
drop(idle_inners);
idle_inner_count
};

let trashed_inner_count = {
let Ok(mut trashed_scheduler_inners) =
scheduler_pool.trashed_scheduler_inners.lock()
Expand All @@ -181,9 +213,10 @@ where
};

info!(
"Scheduler pool cleaner: dropped {} trashed inners",
trashed_inner_count,
"Scheduler pool cleaner: dropped {} idle inners, {} trashed inners",
idle_inner_count, trashed_inner_count,
);
sleepless_testing::at(CheckPoint::IdleSchedulerCleaned(idle_inner_count));
sleepless_testing::at(CheckPoint::TrashedSchedulerCleaned(trashed_inner_count));
}
};
Expand Down Expand Up @@ -242,14 +275,15 @@ where
self.scheduler_inners
.lock()
.expect("not poisoned")
.push(scheduler);
.push((scheduler, Instant::now()));
}
}

fn do_take_scheduler(&self, context: SchedulingContext) -> S {
// pop is intentional for filo, expecting relatively warmed-up scheduler due to having been
// returned recently
if let Some(inner) = self.scheduler_inners.lock().expect("not poisoned").pop() {
if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop()
{
S::from_inner(inner, context)
} else {
S::spawn(self.self_arc(), context)
Expand Down Expand Up @@ -1289,6 +1323,9 @@ mod tests {
BeforeNewTask,
AfterTaskHandled,
AfterSchedulerThreadAborted,
BeforeIdleSchedulerCleaned,
AfterIdleSchedulerCleaned,
BeforeTrashedSchedulerCleaned,
AfterTrashedSchedulerCleaned,
BeforeThreadManagerDrop,
}
Expand Down Expand Up @@ -1326,6 +1363,62 @@ mod tests {

const SHORTENED_POOL_CLEANER_INTERVAL: Duration = Duration::from_millis(1);

#[test]
fn test_scheduler_drop_idle() {
solana_logger::setup();

let _progress = sleepless_testing::setup(&[
&TestCheckPoint::BeforeIdleSchedulerCleaned,
&CheckPoint::IdleSchedulerCleaned(0),
&CheckPoint::IdleSchedulerCleaned(1),
&TestCheckPoint::AfterIdleSchedulerCleaned,
]);

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let shortened_max_pooling_duration = Duration::from_millis(10);
let pool_raw = DefaultSchedulerPool::do_new(
None,
None,
None,
None,
ignored_prioritization_fee_cache,
SHORTENED_POOL_CLEANER_INTERVAL,
shortened_max_pooling_duration,
);
let pool = pool_raw.clone();
let bank = Arc::new(Bank::default_for_tests());
let context1 = SchedulingContext::new(bank);
let context2 = context1.clone();

let old_scheduler = pool.do_take_scheduler(context1);
let new_scheduler = pool.do_take_scheduler(context2);
let new_scheduler_id = new_scheduler.id();
Box::new(old_scheduler.into_inner().1).return_to_pool();

// sleepless_testing can't be used; wait a bit here to see real progress of wall time...
sleep(shortened_max_pooling_duration * 10);
Box::new(new_scheduler.into_inner().1).return_to_pool();

// Block solScCleaner until we see returned schedlers...
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 2);
sleepless_testing::at(TestCheckPoint::BeforeIdleSchedulerCleaned);

// See the old (= idle) scheduler gone only after solScCleaner did its job...
sleepless_testing::at(&TestCheckPoint::AfterIdleSchedulerCleaned);
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1);
assert_eq!(
pool_raw
.scheduler_inners
.lock()
.unwrap()
.first()
.as_ref()
.map(|(inner, _pooled_at)| inner.id())
.unwrap(),
new_scheduler_id
);
}

enum AbortCase {
Unhandled,
UnhandledWhilePanicking,
Expand Down Expand Up @@ -1658,6 +1751,8 @@ mod tests {
&TestCheckPoint::AfterTaskHandled,
&CheckPoint::SchedulerThreadAborted,
&TestCheckPoint::AfterSchedulerThreadAborted,
&TestCheckPoint::BeforeTrashedSchedulerCleaned,
&CheckPoint::TrashedSchedulerCleaned(0),
&CheckPoint::TrashedSchedulerCleaned(1),
&TestCheckPoint::AfterTrashedSchedulerCleaned,
]);
Expand All @@ -1678,6 +1773,7 @@ mod tests {
None,
ignored_prioritization_fee_cache,
SHORTENED_POOL_CLEANER_INTERVAL,
DEFAULT_MAX_POOLING_DURATION,
);
let pool = pool_raw.clone();
let context = SchedulingContext::new(bank.clone());
Expand Down Expand Up @@ -1729,7 +1825,12 @@ mod tests {
bank.wait_for_completed_scheduler(),
Some((Err(TransactionError::AccountNotFound), _timings))
);

// Block solScCleaner until we see trashed schedler...
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 1);
sleepless_testing::at(TestCheckPoint::BeforeTrashedSchedulerCleaned);

// See the trashed scheduler gone only after solScCleaner did its job...
sleepless_testing::at(TestCheckPoint::AfterTrashedSchedulerCleaned);
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0);
Comment on lines +1829 to 1835
Copy link
Collaborator Author

@ryoqun ryoqun Jun 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it turns out this test is racy to begin with... it seems this test isn't racy unless i add new logic to solScCleaner. so, I'm piggybacking this into this pr.

}
Expand Down