Skip to content

Commit

Permalink
[scheduler] don't preempt task if worker queue is empty (#976)
Browse files Browse the repository at this point in the history
There's no need to preempt the current task if the worker queue is empty
and there isn't another task waiting for execution.
  • Loading branch information
fwbrasil authored Jan 6, 2025
1 parent b89935b commit 4dfe2ad
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ abstract private class Worker(
val task = currentTask
val start = taskStartMs
val stalled = (task ne null) && start > 0 && start < nowMs - timeSliceMs
if (stalled) {
if (stalled && !queue.isEmpty()) {
task.doPreempt()
}
stalled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,47 +80,6 @@ class SchedulerTest extends AnyFreeSpec with NonImplicitAssertions {
}
}

"cycle" - {
"cycles workers and preempts tasks" in withScheduler { scheduler =>
val cdl = new CountDownLatch(1)
val preemptLatch = new CountDownLatch(1)
val task = TestTask(
_preempt = () => preemptLatch.countDown(),
_run = () => {
cdl.await()
Task.Done
}
)
scheduler.schedule(task)
preemptLatch.await()
cdl.countDown()
eventually(assert(task.executions == 1))
}
"repeatedly cycles and preempts tasks" in withScheduler { scheduler =>
val cdl = new CountDownLatch(1)
val preemptions = new AtomicInteger(3)
val preempt = new AtomicBoolean
val task = TestTask(
_preempt = () =>
preempt.set(true),
_run = () => {
if (preemptions.getAndDecrement() > 0) {
while (preempt.compareAndSet(true, false)) {}
Task.Preempted
} else {
cdl.await()
Task.Done
}
}
)
scheduler.schedule(task)

eventually(assert(task.preemptions == 3))
cdl.countDown()
eventually(assert(task.executions == 4))
}
}

"asExecutor" - {
"returns an executor that schedules tasks" in withScheduler { scheduler =>
val executor = scheduler.asExecutor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ class WorkerTest extends AnyFreeSpec with NonImplicitAssertions {
cdl.countDown()
}

"preempts long-running task" in withWorker { worker =>
"preempts long-running task if queue isn't empty" in withWorker { worker =>
var preempted = false
val longRunningTask = TestTask(
_run = () => {
Expand All @@ -467,11 +467,27 @@ class WorkerTest extends AnyFreeSpec with NonImplicitAssertions {
_preempt = () => preempted = true
)
worker.enqueue(longRunningTask)
worker.enqueue(longRunningTask)
eventually {
assert(!worker.checkAvailability(System.currentTimeMillis()))
assert(preempted)
}
}
"doesn't preempt long-running task if queue is empty" in withWorker { worker =>
var preempted = false
val longRunningTask = TestTask(
_run = () => {
while (!preempted) {}
Task.Done
},
_preempt = () => preempted = true
)
worker.enqueue(longRunningTask)
eventually {
assert(!worker.checkAvailability(System.currentTimeMillis()))
assert(!preempted)
}
}
"drains queue only once when transitioning to stalled state" in withWorker { worker =>
scheduled.set(0)
val cdl = new CountDownLatch(1)
Expand Down

0 comments on commit 4dfe2ad

Please sign in to comment.