Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean idle pooled schedulers periodically #1575

Merged
merged 5 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ trees = "0.4.2"
tungstenite = "0.20.1"
uriparse = "0.6.4"
url = "2.5.0"
vec_extract_if_polyfill = "0.1.0"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to this page, we're the first consumer of the polyfill crate: https://crates.io/crates/vec_extract_if_polyfill/reverse_dependencies

however, i've confirmed that this crate is almost verbatim copy of what's gated behind the unstable feature flag in our present rust-nightly std source:

$ git diff -w --no-index ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/vec_extract_if_polyfill-0.1.0/src/lib.rs ~/.rustup/toolchains/nightly-2024-05-02-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/vec/extract_if.rs
...
$ git diff -w --no-index ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/vec_extract_if_polyfill-0.1.0/src/lib.rs ~/.rustup/toolchains/nightly-2024-05-02-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/vec/mod.rs
...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we're adding the dependency instead of just using a nightly feature if we really need it? The docs even say this polyfill crate uses it:

This struct is created by Vec::extract_if. See its documentation for more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we only supported building with nightly, it would work.

but, Vec::extract_if() isn't available for stable rust yet...

wasm-bindgen = "0.2"
winapi = "0.3.8"
winreg = "0.50"
Expand Down
7 changes: 7 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions unified-scheduler-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ solana-program-runtime = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-unified-scheduler-logic = { workspace = true }
vec_extract_if_polyfill = { workspace = true }

[dev-dependencies]
assert_matches = { workspace = true }
Expand Down
105 changes: 99 additions & 6 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ use {
Arc, Mutex, OnceLock, Weak,
},
thread::{self, sleep, JoinHandle},
time::Duration,
time::{Duration, Instant},
},
vec_extract_if_polyfill::MakeExtractIf,
};

mod sleepless_testing;
Expand All @@ -63,6 +64,7 @@ enum CheckPoint {
NewTask(usize),
TaskHandled(usize),
SchedulerThreadAborted,
IdleSchedulerCleaned(usize),
TrashedSchedulerCleaned(usize),
}

Expand All @@ -73,7 +75,7 @@ type AtomicSchedulerId = AtomicU64;
// TransactionStatusSender; also, PohRecorder in the future)...
#[derive(Debug)]
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_inners: Mutex<Vec<S::Inner>>,
scheduler_inners: Mutex<Vec<(S::Inner, Instant)>>,
trashed_scheduler_inners: Mutex<Vec<S::Inner>>,
handler_count: usize,
handler_context: HandlerContext,
Expand Down Expand Up @@ -104,6 +106,7 @@ pub type DefaultSchedulerPool =
SchedulerPool<PooledScheduler<DefaultTaskHandler>, DefaultTaskHandler>;

const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10);
const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180);

impl<S, TH> SchedulerPool<S, TH>
where
Expand All @@ -127,6 +130,7 @@ where
replay_vote_sender,
prioritization_fee_cache,
DEFAULT_POOL_CLEANER_INTERVAL,
DEFAULT_MAX_POOLING_DURATION,
)
}

Expand All @@ -137,6 +141,7 @@ where
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
pool_cleaner_interval: Duration,
max_pooling_duration: Duration,
) -> Arc<Self> {
let handler_count = handler_count.unwrap_or(Self::default_handler_count());
assert!(handler_count >= 1);
Expand Down Expand Up @@ -166,6 +171,25 @@ where
break;
};

let idle_inner_count = {
let now = Instant::now();
let Ok(mut scheduler_inners) = scheduler_pool.scheduler_inners.lock() else {
break;
};
// I want to use the fancy ::extract_if() no matter what....

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why though? it seems like we can get something similar with retain and collecting a vec inside?

Copy link
Collaborator Author

@ryoqun ryoqun Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why though? it seems like we can get something similar with retain and collecting a vec inside?

retain only gives &mut to the predicate closure. So, we can't collect (i.e. take the ownership) in that way. After all, that's the why there's the still unstable ::extract_if().

And ::extract_if() is a perfect match and simplest & fastest way for this use case.

Copy link
Collaborator Author

@ryoqun ryoqun Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, while waiting for lgtm, I managed to write a in-source commentary: 2b90fd1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agree it's a good match, would just love if the feature were stable. Just concerned with adding dependency on an otherwise unused crate. repo has no readme either.

kind of curious how you found it.
Looking at the repo, it seems fine to me as is - but who knows with upgrades.

Copy link
Collaborator Author

@ryoqun ryoqun Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kind of curious how you found it.

well, crates.io's own search functionality didn't work... i just googled...: https://www.google.com/search?q=rust+extract_if lol

Looking at the repo, it seems fine to me as is - but who knows with upgrades.

as for upgrades, that's certainly a possibility. however, our dep is pinned to exact commit hash via Cargo.lock as usual. so, i think it's okay unless we're practicing blind dependabot merges. ;)

also as far as i screened author profile pages (https://github.com/chyyran and https://crates.io/users/chyyran). i think there's only legit activities.

Lastly, I'm planning to advertise this crate at the rust's tracking issue to increase the exposure of this crate across the ecosystem and to move this forward the stabilization.

#[allow(unstable_name_collisions)]
let idle_inners = scheduler_inners
.extract_if(|(_inner, pooled_at)| {
now.duration_since(*pooled_at) > max_pooling_duration
})
.collect::<Vec<_>>();
drop(scheduler_inners);

let idle_inner_count = idle_inners.len();
drop(idle_inners);
idle_inner_count
};

let trashed_inner_count = {
let Ok(mut trashed_scheduler_inners) =
scheduler_pool.trashed_scheduler_inners.lock()
Expand All @@ -181,9 +205,10 @@ where
};

info!(
"Scheduler pool cleaner: dropped {} trashed inners",
trashed_inner_count,
"Scheduler pool cleaner: dropped {} idle inners, {} trashed inners",
idle_inner_count, trashed_inner_count,
);
sleepless_testing::at(CheckPoint::IdleSchedulerCleaned(idle_inner_count));
sleepless_testing::at(CheckPoint::TrashedSchedulerCleaned(trashed_inner_count));
}
};
Expand Down Expand Up @@ -242,14 +267,15 @@ where
self.scheduler_inners
.lock()
.expect("not poisoned")
.push(scheduler);
.push((scheduler, Instant::now()));
}
}

fn do_take_scheduler(&self, context: SchedulingContext) -> S {
// pop is intentional for filo, expecting relatively warmed-up scheduler due to having been
// returned recently
if let Some(inner) = self.scheduler_inners.lock().expect("not poisoned").pop() {
if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop()
{
S::from_inner(inner, context)
} else {
S::spawn(self.self_arc(), context)
Expand Down Expand Up @@ -1289,6 +1315,9 @@ mod tests {
BeforeNewTask,
AfterTaskHandled,
AfterSchedulerThreadAborted,
BeforeIdleSchedulerCleaned,
AfterIdleSchedulerCleaned,
BeforeTrashedSchedulerCleaned,
AfterTrashedSchedulerCleaned,
BeforeThreadManagerDrop,
}
Expand Down Expand Up @@ -1326,6 +1355,62 @@ mod tests {

const SHORTENED_POOL_CLEANER_INTERVAL: Duration = Duration::from_millis(1);

#[test]
fn test_scheduler_drop_idle() {
solana_logger::setup();

let _progress = sleepless_testing::setup(&[
&TestCheckPoint::BeforeIdleSchedulerCleaned,
&CheckPoint::IdleSchedulerCleaned(0),
&CheckPoint::IdleSchedulerCleaned(1),
&TestCheckPoint::AfterIdleSchedulerCleaned,
]);

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let shortened_max_pooling_duration = Duration::from_millis(10);
let pool_raw = DefaultSchedulerPool::do_new(
None,
None,
None,
None,
ignored_prioritization_fee_cache,
SHORTENED_POOL_CLEANER_INTERVAL,
shortened_max_pooling_duration,
);
let pool = pool_raw.clone();
let bank = Arc::new(Bank::default_for_tests());
let context1 = SchedulingContext::new(bank);
let context2 = context1.clone();

let old_scheduler = pool.do_take_scheduler(context1);
let new_scheduler = pool.do_take_scheduler(context2);
let new_scheduler_id = new_scheduler.id();
Box::new(old_scheduler.into_inner().1).return_to_pool();

// sleepless_testing can't be used; wait a bit here to see real progress of wall time...
sleep(shortened_max_pooling_duration * 10);
Box::new(new_scheduler.into_inner().1).return_to_pool();

// Block solScCleaner until we see returned schedlers...
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 2);
sleepless_testing::at(TestCheckPoint::BeforeIdleSchedulerCleaned);

// See the old (= idle) scheduler gone only after solScCleaner did its job...
sleepless_testing::at(&TestCheckPoint::AfterIdleSchedulerCleaned);
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1);
assert_eq!(
pool_raw
.scheduler_inners
.lock()
.unwrap()
.first()
.as_ref()
.map(|(inner, _pooled_at)| inner.id())
.unwrap(),
new_scheduler_id
);
}

enum AbortCase {
Unhandled,
UnhandledWhilePanicking,
Expand Down Expand Up @@ -1658,6 +1743,8 @@ mod tests {
&TestCheckPoint::AfterTaskHandled,
&CheckPoint::SchedulerThreadAborted,
&TestCheckPoint::AfterSchedulerThreadAborted,
&TestCheckPoint::BeforeTrashedSchedulerCleaned,
&CheckPoint::TrashedSchedulerCleaned(0),
&CheckPoint::TrashedSchedulerCleaned(1),
&TestCheckPoint::AfterTrashedSchedulerCleaned,
]);
Expand All @@ -1678,6 +1765,7 @@ mod tests {
None,
ignored_prioritization_fee_cache,
SHORTENED_POOL_CLEANER_INTERVAL,
DEFAULT_MAX_POOLING_DURATION,
);
let pool = pool_raw.clone();
let context = SchedulingContext::new(bank.clone());
Expand Down Expand Up @@ -1729,7 +1817,12 @@ mod tests {
bank.wait_for_completed_scheduler(),
Some((Err(TransactionError::AccountNotFound), _timings))
);

// Block solScCleaner until we see trashed schedler...
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 1);
sleepless_testing::at(TestCheckPoint::BeforeTrashedSchedulerCleaned);

// See the trashed scheduler gone only after solScCleaner did its job...
sleepless_testing::at(TestCheckPoint::AfterTrashedSchedulerCleaned);
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0);
Comment on lines +1829 to 1835
Copy link
Collaborator Author

@ryoqun ryoqun Jun 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it turns out this test is racy to begin with... it seems this test isn't racy unless i add new logic to solScCleaner. so, I'm piggybacking this into this pr.

}
Expand Down