Skip to content

Commit

Permalink
Carry over buffered tasks across BP sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Mar 6, 2025
1 parent 9ae8e4c commit 1851179
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 30 deletions.
68 changes: 50 additions & 18 deletions unified-scheduler-logic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,15 +626,21 @@ const_assert_eq!(mem::size_of::<UsageQueue>(), 8);
pub struct SchedulingStateMachine {
unblocked_task_queue: VecDeque<Task>,
active_task_count: ShortCounter,
executing_task_count: ShortCounter,
max_executing_task_count: u32,
handled_task_count: ShortCounter,
unblocked_task_count: ShortCounter,
total_task_count: ShortCounter,
count_token: BlockedUsageCountToken,
usage_queue_token: UsageQueueToken,
}
const_assert_eq!(mem::size_of::<SchedulingStateMachine>(), 48);
const_assert_eq!(mem::size_of::<SchedulingStateMachine>(), 56);

impl SchedulingStateMachine {
pub fn has_no_executing_task(&self) -> bool {
self.executing_task_count.is_zero()
}

pub fn has_no_active_task(&self) -> bool {
self.active_task_count.is_zero()
}
Expand All @@ -643,6 +649,14 @@ impl SchedulingStateMachine {
!self.unblocked_task_queue.is_empty()
}

pub fn has_runnable_task(&self) -> bool {
self.has_unblocked_task() && self.is_task_runnable()
}

fn is_task_runnable(&self) -> bool {
self.executing_task_count.current() < self.max_executing_task_count
}

pub fn unblocked_task_queue_count(&self) -> usize {
self.unblocked_task_queue.len()
}
Expand Down Expand Up @@ -702,13 +716,14 @@ impl SchedulingStateMachine {
self.active_task_count.increment_self();
self.try_lock_usage_queues(task).and_then(|task| {
// locking succeeded, and then ...
if force_buffering {
if !self.is_task_runnable() || force_buffering {
// ... push to unblocked_task_queue, if buffering is forced.
self.unblocked_task_count.increment_self();
self.unblocked_task_queue.push_back(task);
None
} else {
// ... return the task back as schedulable to the caller as-is otherwise.
self.executing_task_count.increment_self();
Some(task)
}
})
Expand All @@ -717,6 +732,7 @@ impl SchedulingStateMachine {
#[must_use]
pub fn schedule_next_unblocked_task(&mut self) -> Option<Task> {
self.unblocked_task_queue.pop_front().inspect(|_| {
self.executing_task_count.increment_self();
self.unblocked_task_count.increment_self();
})
}
Expand All @@ -732,6 +748,7 @@ impl SchedulingStateMachine {
/// tasks inside `SchedulingStateMachine` to provide an offloading-based optimization
/// opportunity for callers.
pub fn deschedule_task(&mut self, task: &Task) {
self.executing_task_count.decrement_self();
self.active_task_count.decrement_self();
self.handled_task_count.increment_self();
self.unlock_usage_queues(task);
Expand Down Expand Up @@ -887,11 +904,14 @@ impl SchedulingStateMachine {
/// other slots.
pub fn reinitialize(&mut self) {
assert!(self.has_no_active_task());
assert_eq!(self.executing_task_count.current(), 0);
assert_eq!(self.unblocked_task_queue.len(), 0);
// nice trick to ensure all fields are handled here if new one is added.
let Self {
unblocked_task_queue: _,
active_task_count,
executing_task_count: _,
max_executing_task_count: _,
handled_task_count,
unblocked_task_count,
total_task_count,
Expand All @@ -911,19 +931,31 @@ impl SchedulingStateMachine {
/// # Safety
/// Call this exactly once for each thread. See [`TokenCell`] for details.
#[must_use]
pub unsafe fn exclusively_initialize_current_thread_for_scheduling() -> Self {
pub unsafe fn exclusively_initialize_current_thread_for_scheduling(
max_executing_task_count: Option<usize>,
) -> Self {
Self {
// It's very unlikely this is desired to be configurable, like
// `UsageQueueInner::blocked_usages_from_tasks`'s cap.
unblocked_task_queue: VecDeque::with_capacity(1024),
active_task_count: ShortCounter::zero(),
executing_task_count: ShortCounter::zero(),
max_executing_task_count: max_executing_task_count
.unwrap_or(u32::MAX as usize)
.try_into()
.unwrap(),
handled_task_count: ShortCounter::zero(),
unblocked_task_count: ShortCounter::zero(),
total_task_count: ShortCounter::zero(),
count_token: unsafe { BlockedUsageCountToken::assume_exclusive_mutating_thread() },
usage_queue_token: unsafe { UsageQueueToken::assume_exclusive_mutating_thread() },
}
}

#[cfg(test)]
unsafe fn exclusively_initialize_current_thread_for_scheduling_for_test() -> Self {
Self::exclusively_initialize_current_thread_for_scheduling(None)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -985,7 +1017,7 @@ mod tests {
#[test]
fn test_scheduling_state_machine_creation() {
let state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
assert_eq!(state_machine.active_task_count(), 0);
assert_eq!(state_machine.total_task_count(), 0);
Expand All @@ -995,7 +1027,7 @@ mod tests {
#[test]
fn test_scheduling_state_machine_good_reinitialization() {
let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
state_machine.total_task_count.increment_self();
assert_eq!(state_machine.total_task_count(), 1);
Expand All @@ -1007,7 +1039,7 @@ mod tests {
#[should_panic(expected = "assertion failed: self.has_no_active_task()")]
fn test_scheduling_state_machine_bad_reinitialization() {
let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
let address_loader = &mut create_address_loader(None);
let task = SchedulingStateMachine::create_task(simplest_transaction(), 3, address_loader);
Expand All @@ -1032,7 +1064,7 @@ mod tests {
let task = SchedulingStateMachine::create_task(sanitized, 3, address_loader);

let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
let task = state_machine.schedule_task(task).unwrap();
assert_eq!(state_machine.active_task_count(), 1);
Expand All @@ -1052,7 +1084,7 @@ mod tests {
let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);

let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
assert_matches!(
state_machine
Expand Down Expand Up @@ -1104,7 +1136,7 @@ mod tests {
let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);

let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
assert_matches!(
state_machine
Expand Down Expand Up @@ -1154,7 +1186,7 @@ mod tests {
let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);

let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
// both of read-only tasks should be immediately runnable
assert_matches!(
Expand Down Expand Up @@ -1195,7 +1227,7 @@ mod tests {
let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);

let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
assert_matches!(
state_machine
Expand Down Expand Up @@ -1246,7 +1278,7 @@ mod tests {
let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);

let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
assert_matches!(
state_machine
Expand Down Expand Up @@ -1288,7 +1320,7 @@ mod tests {
let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);

let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
assert_matches!(
state_machine
Expand Down Expand Up @@ -1324,7 +1356,7 @@ mod tests {
let task4 = SchedulingStateMachine::create_task(sanitized4, 104, address_loader);

let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
assert_matches!(
state_machine
Expand Down Expand Up @@ -1380,7 +1412,7 @@ mod tests {
let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);

let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
assert_matches!(
state_machine
Expand Down Expand Up @@ -1420,7 +1452,7 @@ mod tests {
#[should_panic(expected = "internal error: entered unreachable code")]
fn test_unreachable_unlock_conditions1() {
let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
let usage_queue = UsageQueue::default();
usage_queue
Expand All @@ -1434,7 +1466,7 @@ mod tests {
#[should_panic(expected = "internal error: entered unreachable code")]
fn test_unreachable_unlock_conditions2() {
let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
let usage_queue = UsageQueue::default();
usage_queue
Expand All @@ -1449,7 +1481,7 @@ mod tests {
#[should_panic(expected = "internal error: entered unreachable code")]
fn test_unreachable_unlock_conditions3() {
let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
};
let usage_queue = UsageQueue::default();
usage_queue
Expand Down
Loading

0 comments on commit 1851179

Please sign in to comment.