diff --git a/kyo-scheduler/jvm-native/src/main/scala/kyo/scheduler/Worker.scala b/kyo-scheduler/jvm-native/src/main/scala/kyo/scheduler/Worker.scala index af2eece20..79e8bfbc1 100644 --- a/kyo-scheduler/jvm-native/src/main/scala/kyo/scheduler/Worker.scala +++ b/kyo-scheduler/jvm-native/src/main/scala/kyo/scheduler/Worker.scala @@ -174,7 +174,7 @@ abstract private class Worker( val task = currentTask val start = taskStartMs val stalled = (task ne null) && start > 0 && start < nowMs - timeSliceMs - if (stalled) { + if (stalled && !queue.isEmpty()) { task.doPreempt() } stalled diff --git a/kyo-scheduler/jvm-native/src/test/scala/kyo/scheduler/SchedulerTest.scala b/kyo-scheduler/jvm-native/src/test/scala/kyo/scheduler/SchedulerTest.scala index 22547cb10..e9e2fccfe 100644 --- a/kyo-scheduler/jvm-native/src/test/scala/kyo/scheduler/SchedulerTest.scala +++ b/kyo-scheduler/jvm-native/src/test/scala/kyo/scheduler/SchedulerTest.scala @@ -80,47 +80,6 @@ class SchedulerTest extends AnyFreeSpec with NonImplicitAssertions { } } - "cycle" - { - "cycles workers and preempts tasks" in withScheduler { scheduler => - val cdl = new CountDownLatch(1) - val preemptLatch = new CountDownLatch(1) - val task = TestTask( - _preempt = () => preemptLatch.countDown(), - _run = () => { - cdl.await() - Task.Done - } - ) - scheduler.schedule(task) - preemptLatch.await() - cdl.countDown() - eventually(assert(task.executions == 1)) - } - "repeatedly cycles and preempts tasks" in withScheduler { scheduler => - val cdl = new CountDownLatch(1) - val preemptions = new AtomicInteger(3) - val preempt = new AtomicBoolean - val task = TestTask( - _preempt = () => - preempt.set(true), - _run = () => { - if (preemptions.getAndDecrement() > 0) { - while (preempt.compareAndSet(true, false)) {} - Task.Preempted - } else { - cdl.await() - Task.Done - } - } - ) - scheduler.schedule(task) - - eventually(assert(task.preemptions == 3)) - cdl.countDown() - eventually(assert(task.executions == 4)) - } - } - "asExecutor" - { "returns an executor that schedules tasks" in withScheduler { scheduler => val executor = scheduler.asExecutor diff --git a/kyo-scheduler/jvm-native/src/test/scala/kyo/scheduler/WorkerTest.scala b/kyo-scheduler/jvm-native/src/test/scala/kyo/scheduler/WorkerTest.scala index 87591d9eb..9390ad991 100644 --- a/kyo-scheduler/jvm-native/src/test/scala/kyo/scheduler/WorkerTest.scala +++ b/kyo-scheduler/jvm-native/src/test/scala/kyo/scheduler/WorkerTest.scala @@ -457,7 +457,7 @@ class WorkerTest extends AnyFreeSpec with NonImplicitAssertions { cdl.countDown() } - "preempts long-running task" in withWorker { worker => + "preempts long-running task if queue isn't empty" in withWorker { worker => var preempted = false val longRunningTask = TestTask( _run = () => { @@ -467,11 +467,27 @@ class WorkerTest extends AnyFreeSpec with NonImplicitAssertions { _preempt = () => preempted = true ) worker.enqueue(longRunningTask) + worker.enqueue(longRunningTask) eventually { assert(!worker.checkAvailability(System.currentTimeMillis())) assert(preempted) } } + "doesn't preempt long-running task if queue is empty" in withWorker { worker => + var preempted = false + val longRunningTask = TestTask( + _run = () => { + while (!preempted) {} + Task.Done + }, + _preempt = () => preempted = true + ) + worker.enqueue(longRunningTask) + eventually { + assert(!worker.checkAvailability(System.currentTimeMillis())) + assert(!preempted) + } + } "drains queue only once when transitioning to stalled state" in withWorker { worker => scheduled.set(0) val cdl = new CountDownLatch(1)