Date: Sun, 2 Feb 2025 10:07:03 -0500
Subject: [PATCH 16/40] Solidify design; add documentation
---
.../java/util/concurrent/ForkJoinPool.java | 590 ++++++++++--------
.../java/util/concurrent/ForkJoinTask.java | 86 ++-
2 files changed, 398 insertions(+), 278 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index df6cf5471135f..9e8898e890229 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -136,6 +136,17 @@
*
*
*
+ * Additionally, this class supports {@link
+ * ScheduledExecutorService} methods to delay or periodically execute
+ * tasks, as well as method {#link #submitAndCancelOnTimeout} to
+ * cancel tasks that take too long. The submitted functions or actions
+ * may create and invoke other {@linkplain ForkJoinTask
+ * ForkJoinTasks}. When time-based methods are used, shutdown
+ * policies are based on the default policies of class {@link
+ * ScheduledThreadPoolExecutor}: upon {@link #shutdown}, existing
+ * periodic tasks will not re-execute, and the pool terminates when
+ * quiescent and existing delayed tasks complete.
+ *
*
The parameters used to construct the common pool may be controlled by
* setting the following {@linkplain System#getProperty system properties}:
*
@@ -2716,14 +2727,14 @@ else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) {
if ((quiet = quiescent()) > 0)
now = true;
else if (quiet == 0 && (ds = delayScheduler) != null)
- ds.activate();
+ ds.ensureActive();
}
if (now) {
DelayScheduler ds;
releaseWaiters();
if ((ds = delayScheduler) != null)
- ds.activate();
+ ds.ensureActive();
for (;;) {
if (((e = runState) & CLEANED) == 0L) {
boolean clean = cleanQueues();
@@ -2734,7 +2745,7 @@ else if (quiet == 0 && (ds = delayScheduler) != null)
break;
if (ctl != 0L) // else loop if didn't finish cleaning
break;
- if ((ds = delayScheduler) != null && !ds.activate())
+ if ((ds = delayScheduler) != null && ds.ensureActive() >= 0)
break;
if ((e & CLEANED) != 0L) {
e |= TERMINATED;
@@ -3374,28 +3385,78 @@ public T invokeAny(Collection extends Callable> tasks,
.invokeAny(tasks, this, true, unit.toNanos(timeout));
}
+ /*
+ * The DelayScheduler maintains a binary heap based on trigger times
+ * (field DelayedTask.when) along with a pending queue of tasks
+ * submitted by other threads. When ready, tasks are pushed onto
+ * an external WorkQueue.
+ *
+ * To reduce memory contention, the heap is maintained solely via
+ * local variables in method loop() (forcing noticeable code
+ * sprawl), recording only the heap array to allow method
+ * canShutDown to conservatively check emptiness.
+ *
+ * The pending queue uses a design similar to ForkJoinTask.Aux
+ * queues: Incoming requests prepend (Treiber-stack-style) to the
+ * pending list. The scheduler thread takes and nulls out the
+ * entire list per step to process them as a batch. The pending
+ * queue may encounter contention and retries among requesters,
+ * but much less so versus the scheduler.
+ *
+ * Field "active" records whether the scheduler may have any
+ * pending tasks (and/or shutdown actions) to process, otherwise
+ * parking either indefinitely or until the next task
+ * deadline. Incoming pending tasks ensure active status,
+ * unparking if necessary. The scheduler thread sets status to inactive
+ * when apparently no work, and then rechecks before actually
+ * parking. The active field takes on a negative value on
+ * termination, as a sentinel used in pool tryTerminate checks as
+ * well as to suppress reactivation while terminating.
+ *
+ * The implementation is designed to accommodate usages in which
+ * many or even most tasks are cancelled before executing (mainly
+ * IO-based timeouts). Cancellations are added to the pending
+ * queue in method DelayedTask.cancel(), to remove them from the
+ * heap. (This requires some safeguards to deal with tasks
+ * cancelled while they are still pending.) In addition,
+ * cancelled tasks set their "when" fields to Long.MAX_VALUE,
+ * which causes them to be pushed toward the bottom of the heap
+ * where they can be simply swept out in the course of other add
+ * and replace operations, even before processing the removal
+ * request (which is then a no-op).
+ *
+ * To ensure that comparisons do not encounter integer wrap
+ * errors, times are offset with the most negative possible value
+ * (nanoTimeOffset) determined during static initialization, and
+ * negative delays are screened out in public submission methods
+ *
+ * For the sake of compatibility with ScheduledThreadPoolExecutor,
+ * shutdown follows the same rules, which add some further
+ * complexity beyond the cleanup associated with shutdownNow
+ * (runState STOP). Upon noticing pool shutdown, all periodic
+ * tasks are purged; the scheduler then triggers pool.tryTerminate
+ * when the heap is empty. The asynchronicity of these steps with
+ * respect to pool runState weakens guarantees about exactly when
+ * remaining tasks report isCancelled to callers (they do not run,
+ * but there may be a lag setting their status).
+ */
static final class DelayScheduler extends Thread {
private static final int INITIAL_HEAP_CAPACITY = 1 << 6;
- private final ForkJoinPool pool;
- private DelayedTask>[] heap;
- private int heapSize;
- @jdk.internal.vm.annotation.Contended()
- private volatile int active; // 0: inactive, <0: stopped, >0: running
+ private final ForkJoinPool pool; // read only once
+ private DelayedTask>[] heap; // written only when (re)allocated
+ private volatile int active; // 0: inactive, -1: stopped, +1: running
@jdk.internal.vm.annotation.Contended()
- private volatile DelayedTask> additions;
- @jdk.internal.vm.annotation.Contended()
- private volatile DelayedTask> removals;
+ private volatile DelayedTask> pending;
+
private static final Unsafe U;
private static final long ACTIVE;
- private static final long ADDITIONS;
- private static final long REMOVALS;
+ private static final long PENDING;
static final long nanoTimeOffset;
static {
U = Unsafe.getUnsafe();
Class klass = DelayScheduler.class;
ACTIVE = U.objectFieldOffset(klass, "active");
- ADDITIONS = U.objectFieldOffset(klass, "additions");
- REMOVALS = U.objectFieldOffset(klass, "removals");
+ PENDING = U.objectFieldOffset(klass, "pending");
long ns = System.nanoTime(); // ensure negative to avoid overflow
nanoTimeOffset = Long.MIN_VALUE + (ns < 0L ? ns : 0L);
}
@@ -3406,217 +3467,191 @@ static final class DelayScheduler extends Thread {
pool = p;
}
+ /**
+ * Returns System.nanoTime() with nanoTimeOffset
+ */
static final long now() {
return nanoTimeOffset + System.nanoTime();
}
- final boolean activate() {
+ /**
+ * Ensure active, unparking if necessary, unless stopped
+ */
+ final int ensureActive() {
int state;
- if ((state = active) < 0) // stopped
- return true;
- if (state == 0 && U.getAndBitwiseOrInt(this, ACTIVE, 1) == 0)
+ if ((state = active) == 0 && U.getAndBitwiseOrInt(this, ACTIVE, 1) == 0)
U.unpark(this);
- return false;
- }
-
- final boolean canShutDown() {
- int state; DelayedTask>[] h;
- return ((state = active) < 0 ||
- (state == 0 && heapSize <= 0 &&
- ((h = heap) == null || h.length <= 0 || h[0] == null) &&
- active <= 0));
+ return state;
}
- final void add(DelayedTask> task) {
- DelayedTask> f = additions;
+ /**
+ * Inserts the task to pending queue, to add, remove, or ignore
+ * depending on task status when processed.
+ */
+ final void pend(DelayedTask> task) {
+ DelayedTask> f = pending;
if (task != null) {
do {} while (
f != (f = (DelayedTask>)
U.compareAndExchangeReference(
- this, ADDITIONS, task.nextPending = f, task)));
- activate();
+ this, PENDING, task.nextPending = f, task)));
+ ensureActive();
}
}
- final void remove(DelayedTask> task) {
- DelayedTask> f = removals;
- if (task != null) {
- do {} while (
- f != (f = (DelayedTask>)
- U.compareAndExchangeReference(
- this, REMOVALS, task.nextPending = f, task)));
- activate();
- }
+ /**
+ * Returns true if (momentarily) inactive and heap is empty
+ */
+ final boolean canShutDown() {
+ DelayedTask>[] h;
+ return (active <= 0 &&
+ ((h = heap) == null || h.length <= 0 || h[0] == null) &&
+ active <= 0);
}
+ /**
+ * Setup/teardown for scheduling loop
+ */
public final void run() {
ForkJoinPool p;
ThreadLocalRandom.localInit();
if ((p = pool) != null) {
try {
- WorkQueue q; // establish default submission queue
- heap = new DelayedTask>[INITIAL_HEAP_CAPACITY];
- if ((q = p.externalSubmissionQueue(false)) != null) {
- q.unlockPhase();
- runScheduler(p);
- cancelAll();
- }
+ loop(p);
} finally {
- active = 1 << 31; // set negative
+ active = -1;
p.tryTerminate(false, false);
}
}
}
- private void runScheduler(ForkJoinPool p) {
- if (p != null) {
- boolean idle = false;
- int canStop = 0;
- long waitTime = 0L;
- for (;;) {
- long prs;
- if (((prs = p.runState) & STOP) != 0L)
- break;
- else if ((prs & SHUTDOWN) != 0L &&
- (canStop = tryStopOnShutdown(p, canStop)) < 0)
- break;
- else if (idle) {
- idle = false;
- Thread.interrupted(); // clear
- if (active == 0) {
- U.park(false, waitTime);
- active = 1;
+ /**
+ * After initialization, repeatedly:
+ * 1. Process pending tasks in batches, to add or remove from heap,
+ * 2. Check for shutdown, either exiting or preparing for shutdown when empty
+ * 3. Trigger all ready tasks by externally submitting them to pool
+ * 4. If active, set tentatively inactive,
+ * else park until next trigger time, or indefinitely if none
+ */
+ private void loop(ForkJoinPool p) {
+ WorkQueue sq; DelayedTask>[] h;
+ if ((sq = p.externalSubmissionQueue(false)) != null)
+ sq.unlockPhase(); // try creating default submission queue
+ heap = h = new DelayedTask>[INITIAL_HEAP_CAPACITY];
+ active = 1;
+ boolean purgedPeriodic = false;
+ for (int n = 0;;) { // n is heap size
+ DelayedTask> t;
+ while (pending != null && // process pending tasks
+ (t = (DelayedTask>)
+ U.getAndSetReference(this, PENDING, null)) != null) {
+ DelayedTask> next;
+ int cap = h.length;
+ do {
+ int i = t.heapIndex;
+ long d = t.when;
+ if ((next = t.nextPending) != null)
+ t.nextPending = null;
+ if (i >= 0) {
+ t.heapIndex = -1;
+ if (i < n && i < cap && h[i] == t)
+ n = replace(h, i, n);
}
- }
- else {
- int state = active;
- int hs = processPending(p, heap, heapSize);
- waitTime = (hs > 0) ? submitReadyTasks(p, heap, hs) : 0L;
- if (active == state) {
- if (state != 0)
- U.compareAndSetInt(this, ACTIVE, 1, 0);
- else
- idle = true;
+ else if (t.status >= 0) {
+ if (n >= cap || n < 0) // couldn't resize
+ t.trySetCancelled();
+ else {
+ DelayedTask> parent, u; int pk, newCap;
+ while (n > 0 && // clear trailing cancelled tasks
+ (u = h[n - 1]) != null && u.status < 0) {
+ u.heapIndex = -1;
+ h[--n] = null;
+ }
+ int k = n++;
+ while (k > 0 && // sift up
+ (parent = h[pk = (k - 1) >>> 1]) != null &&
+ (parent.when > d)) {
+ parent.heapIndex = k;
+ h[k] = parent;
+ k = pk;
+ }
+ t.heapIndex = k;
+ h[k] = t;
+ if (n >= cap && (newCap = cap << 1) > cap) {
+ DelayedTask>[] a = null;
+ try { // try to resize
+ a = Arrays.copyOf(heap, newCap);
+ } catch (Error | RuntimeException ex) {
+ }
+ if (a != null && (newCap = a.length) > cap) {
+ cap = newCap;
+ heap = h = a;
+ U.storeFence();
+ }
+ }
+ }
}
- }
+ } while ((t = next) != null);
}
- }
- }
- private long submitReadyTasks(ForkJoinPool p, DelayedTask>[] h, int hs) {
- long waitTime = 0L, prs; int s;
- if ((s = hs) > 0 && h != null && h.length > 0 && p != null &&
- ((prs = p.runState) & STOP) == 0L) {
- for (long now = now();;) {
- DelayedTask> first; int stat; long d;
- boolean cancel = false;
- if ((first = h[0]) == null)
- break;
- if ((stat = first.status) >= 0) {
- if (first.nextDelay != 0L && (prs & SHUTDOWN) != 0L)
- cancel = true;
- else if ((d = first.when - now) > 0L) {
- waitTime = d;
- break;
- }
- }
- first.heapIndex = -1;
- if (stat >= 0 && !cancel) {
- try {
- WorkQueue q;
- if ((q = p.externalSubmissionQueue(false)) == null)
- cancel = true; // terminating
- else if (first.status < 0)
- q.unlockPhase();
- else
- q.push(first, p, false);
- } catch(Error | RuntimeException ex) {
- cancel = true;
- }
- }
- if (cancel)
- first.trySetCancelled();
- if ((s = heapReplace(h, 0, s)) == 0)
+ if ((p.runState & SHUTDOWN) != 0L) {
+ if ((n = tryStop(p, h, n, purgedPeriodic)) < 0)
break;
+ purgedPeriodic = true;
}
- if (s != hs)
- heapSize = s;
- }
- return waitTime;
- }
- private int processPending(ForkJoinPool p, DelayedTask>[] h, int hs) {
- int s = hs;
- if (p != null && h != null) {
- for (;;) {
- DelayedTask> t =
- (removals != null) ?
- (DelayedTask>)U.getAndSetReference(this, REMOVALS, null):
- (additions != null) ?
- (DelayedTask>)U.getAndSetReference(this, ADDITIONS, null):
- null;
- if (t == null)
- break;
- for (;;) {
- int cap = h.length, idx; DelayedTask> next;
- if ((next = t.nextPending) != null)
- t.nextPending = null;
- if ((idx = t.heapIndex) >= 0) {
- t.heapIndex = -1;
- if (idx < s && idx < cap && h[idx] == t)
- s = heapReplace(h, idx, s);
- }
- else if (s >= cap || s < 0 || // couldn't resize
- (t.nextDelay != 0L && p.isShutdown()))
- t.trySetCancelled();
- else if (t.status >= 0) {
- DelayedTask> u; int k, newCap;
- while (s > 0 && // clear trailing cancelled tasks
- (u = h[s - 1]) != null && u.status < 0) {
- u.heapIndex = -1;
- h[--s] = null;
- }
- if ((k = s++) > 0) { // sift up
- for (long d = t.when;;) {
- int pk; DelayedTask> par;
- if ((par = h[pk = (k - 1) >>> 1]) == null)
- break;
- if (par.when <= d && par.status >= 0)
- break;
- par.heapIndex = k;
- h[k] = par;
- if ((k = pk) == 0)
- break;
- }
+ long parkTime = 0L; // zero for untimed park
+ if (n > 0 && h.length > 0) {
+ long now = now();
+ do { // submit ready tasks
+ DelayedTask> f; int stat;
+ if ((f = h[0]) != null) {
+ long d = f.when - now;
+ if ((stat = f.status) >= 0 && d > 0L) {
+ parkTime = d;
+ break;
}
- t.heapIndex = k;
- h[k] = t;
- if (s >= cap && (newCap = cap << 1) > cap) {
- DelayedTask>[] a = null;
- try { // try to resize
- a = Arrays.copyOf(heap, newCap);
- } catch (Error | RuntimeException ex) {
+ f.heapIndex = -1;
+ if (stat >= 0) { // else already cancelled
+ boolean cancel = false;
+ try {
+ WorkQueue q = p.externalSubmissionQueue(false);
+ if (q == null) // terminating
+ cancel = true;
+ else
+ q.push(f, p, false);
+ } catch(Error | RuntimeException ex) {
+ cancel = true;
}
- if (a != null)
- cap = (heap = h = a).length;
+ if (cancel)
+ f.trySetCancelled();
}
}
- if ((t = next) == null)
- break;
- }
+ } while ((n = replace(h, 0, n)) > 0);
+ }
+
+ if (pending == null) {
+ Thread.interrupted(); // clear before park
+ if (active == 0)
+ U.park(false, parkTime);
+ else
+ U.compareAndSetInt(this, ACTIVE, 1, 0);
}
}
- if (s != hs)
- heapSize = s;
- return s;
}
- private static int heapReplace(DelayedTask>[] h, int k, int s) {
- if (k >= 0 && s > 0 && h != null && s <= h.length) {
+ /**
+ * Replaces removed heap element at index k
+ * @return current heap size
+ */
+ private static int replace(DelayedTask>[] h, int k, int n) {
+ if (k >= 0 && n > 0 && h != null && n <= h.length) {
DelayedTask> t = null, u;
- while (--s > k) { // find uncancelled replacement
- if ((u = h[s]) != null) {
- h[s] = null;
+ long d = 0L;
+ while (--n > k) { // find uncancelled replacement
+ if ((u = h[n]) != null) {
+ h[n] = null;
+ d = u.when;
if (u.status >= 0) {
t = u;
break;
@@ -3625,15 +3660,12 @@ private static int heapReplace(DelayedTask>[] h, int k, int s) {
}
}
if (t != null) {
- long d = t.when, rd; int ck, rk; DelayedTask> c, r;
- while ((ck = (k << 1) + 1) < s && (c = h[ck]) != null) {
- long cw = c.when;
- long cd = (c.status < 0) ? Long.MAX_VALUE : cw;
- if ((rk = ck + 1) < s && (r = h[rk]) != null &&
- (rd = r.when) < cd && r.status >= 0) {
- cd = rd;
- c = r;
- ck = rk;
+ int ck, rk; DelayedTask> c, r;
+ while ((ck = (k << 1) + 1) < n && (c = h[ck]) != null) {
+ long cd = c.when, rd;
+ if ((rk = ck + 1) < n && (r = h[rk]) != null &&
+ (rd = r.when) < cd) {
+ c = r; ck = rk; cd = rd; // use right child
}
if (d <= cd)
break;
@@ -3645,53 +3677,55 @@ private static int heapReplace(DelayedTask>[] h, int k, int s) {
}
h[k] = t;
}
- return s;
+ return n;
}
- private int tryStopOnShutdown(ForkJoinPool p, int canStop) {
- DelayedTask>[] h; int s;
- if ((s = heapSize) > 0 && canStop == 0 && (h = heap) != null &&
- h.length >= s) { // remove periodic tasks
- DelayedTask> t; int stat;
- for (int i = 0; i < s && (t = h[s]) != null; ) {
- if ((stat = t.status) < 0 || t.nextDelay != 0L) {
- t.heapIndex = -1;
- if (stat >= 0)
- t.trySetCancelled();
- s = heapReplace(h, i, s);
+ /**
+ * Call only when pool is shutdown or stopping. If called when
+ * shutdown but not stopping, removes periodic tasks if not
+ * already done so, and if not empty or pool not terminating,
+ * returns. Otherwise, cancels all tasks in heap and pending
+ * queue.
+ * @return negative if stop, else current heap size.
+ */
+ private int tryStop(ForkJoinPool p, DelayedTask>[] h,
+ int n, boolean purgedPeriodic) {
+ if (p != null && h != null && h.length >= n) {
+ if (((p.runState & STOP) == 0L)) {
+ if (!purgedPeriodic && n > 0) {
+ DelayedTask> t; int stat; // remove periodic tasks
+ for (int i = n - 1; i >= 0; --i) {
+ if ((t = h[i]) != null &&
+ ((stat = t.status) < 0 || t.nextDelay != 0L)) {
+ t.heapIndex = -1;
+ if (stat >= 0)
+ t.trySetCancelled();
+ n = replace(h, i, n);
+ }
+ }
}
+ if (n > 0 || (p.tryTerminate(false, false) & STOP) == 0L)
+ return n;
}
- heapSize = s;
- }
- return (s == 0 && active <= 0 && p != null &&
- (p.tryTerminate(false, false) & STOP) != 0L) ? -1 : 1;
- }
-
- private void cancelAll() {
- DelayedTask>[] h; int s;
- if ((s = heapSize) > 0 && (h = heap) != null && h.length >= s) {
- heapSize = 0;
- do {
- DelayedTask> u;
- if ((u = h[--s]) != null) {
- h[s] = null;
- u.heapIndex = -1;
+ for (int i = 0; i < n; ++i) {
+ DelayedTask> u = h[i];
+ h[i] = null;
+ if (u != null)
u.trySetCancelled();
- }
- } while (s > 0);
- }
- DelayedTask> t = (DelayedTask>)
- U.getAndSetReference(this, ADDITIONS, null);
- if (t != null) {
- do {
- t.trySetCancelled();
- } while ((t = t.nextPending) != null);
+ }
+ for (DelayedTask> a = (DelayedTask>)
+ U.getAndSetReference(this, PENDING, null);
+ a != null; a = a.nextPending)
+ a.trySetCancelled(); // clear pending requests
}
- removals = null;
+ return -1;
}
}
- private DelayScheduler startDelayScheduler() {
+ /**
+ * Common code for ScheduledExecutorService methods
+ */
+ private void sched(DelayedTask> t, long delay) {
DelayScheduler ds;
if ((ds = delayScheduler) == null) {
boolean start = false;
@@ -3713,24 +3747,15 @@ private DelayScheduler startDelayScheduler() {
ds.start();
}
}
- return ds;
- }
-
- private ScheduledFuture sched(Runnable command, Callable callable,
- long delay, long nextDelay) {
- DelayScheduler ds;
- long when = DelayScheduler.now() + delay;
- DelayedTask t =
- new DelayedTask<>(command, callable, this, nextDelay, when);
- if ((ds = delayScheduler) == null)
- ds = startDelayScheduler();
- if (delay == 0L)
- poolSubmit(true, t);
- else if (ds == null || (runState & SHUTDOWN) != 0L)
+ if (ds == null || (runState & SHUTDOWN) != 0L)
throw new RejectedExecutionException();
- else
- ds.add(t);
- return t;
+ if (t != null) {
+ t.when = DelayScheduler.now() + delay;
+ if (delay == 0L)
+ poolSubmit(true, t);
+ else
+ ds.pend(t);
+ }
}
public ScheduledFuture> schedule(Runnable command,
@@ -3738,8 +3763,10 @@ public ScheduledFuture> schedule(Runnable command,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
- return sched(command, (Callable)null,
- (delay <= 0L) ? 0L : unit.toNanos(delay), 0L);
+ long d = (delay <= 0L) ? 0L : unit.toNanos(delay);
+ DelayedTask t = new DelayedTask(command, null, this, 0L);
+ sched(t, d);
+ return t;
}
public ScheduledFuture schedule(Callable callable,
@@ -3747,8 +3774,10 @@ public ScheduledFuture schedule(Callable callable,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
- return sched(null, callable,
- (delay <= 0L) ? 0L : unit.toNanos(delay), 0L);
+ long d = (delay <= 0L) ? 0L : unit.toNanos(delay);
+ DelayedTask t = new DelayedTask(null, callable, this, 0L);
+ sched(t, d);
+ return t;
}
public ScheduledFuture> scheduleAtFixedRate(Runnable command,
@@ -3759,9 +3788,11 @@ public ScheduledFuture> scheduleAtFixedRate(Runnable command,
throw new NullPointerException();
if (period <= 0L)
throw new IllegalArgumentException();
- return sched(command, (Callable)null,
- (initialDelay <= 0L) ? 0L : unit.toNanos(initialDelay),
- -unit.toNanos(period));
+ long d = (initialDelay <= 0L) ? 0L : unit.toNanos(initialDelay);
+ long p = -unit.toNanos(period); // negative for fixed rate
+ DelayedTask t = new DelayedTask(command, null, this, p);
+ sched(t, d);
+ return t;
}
public ScheduledFuture> scheduleWithFixedDelay(Runnable command,
@@ -3772,9 +3803,56 @@ public ScheduledFuture> scheduleWithFixedDelay(Runnable command,
throw new NullPointerException();
if (delay <= 0L)
throw new IllegalArgumentException();
- return sched(command, (Callable)null,
- (initialDelay <= 0L) ? 0L : unit.toNanos(initialDelay),
- unit.toNanos(delay));
+ long d = (initialDelay <= 0L) ? 0L : unit.toNanos(initialDelay);
+ long p = unit.toNanos(delay);
+ DelayedTask t = new DelayedTask(command, null, this, p);
+ sched(t, d);
+ return t;
+ }
+
+ /**
+ * Body of a DelayedTask serving to cancel another task on timeout
+ */
+ static final class CancelAction implements Runnable {
+ ForkJoinTask> task; // set after construction
+ public void run() {
+ ForkJoinTask> t;
+ if ((t = task) != null)
+ t.cancel(true);
+ }
+ }
+
+ /**
+ * Submits a task executing the given function, cancelling it (via
+ * {@code cancel(true)}) if not completed within the given timeout
+ * period.
+ *
+ * @param callable the function to execute
+ * @param the type of the callable's result
+ * @param timeout the time to wait before cancelling if not completed
+ * @param unit the time unit of the timeout parameter
+ * @return a Future that can be used to extract result or cancel
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ * @throws NullPointerException if callable or unit is null
+ * @throws IllegalArgumentException if timeout less than or equal to zero
+ */
+ public ForkJoinTask submitAndCancelOnTimeout(Callable