Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8319447: Improve performance of delayed task handling #23702

Open
wants to merge 50 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
cb1aedf
In-progress snapshot
DougLea Jan 6, 2025
5c4ca21
Better conform to STPE
DougLea Jan 7, 2025
bb0e6a6
Refactorings
DougLea Jan 9, 2025
f683d7f
Use pendingRemval queue
DougLea Jan 9, 2025
3801ba0
Reduce unparks
DougLea Jan 10, 2025
d630b40
Better pending queues
DougLea Jan 16, 2025
0dd4ab7
Merge branch 'openjdk:master' into JDK-8319447
DougLea Jan 16, 2025
7e04af5
Conform to default STPE policies
DougLea Jan 19, 2025
7288e32
Comment out racy test
DougLea Jan 19, 2025
b14e31c
Merge branch 'openjdk:master' into JDK-8319447
DougLea Jan 19, 2025
cb202b6
Reduce memory contention
DougLea Jan 21, 2025
1f0a5cf
Use nanoTimeOrigin
DougLea Jan 21, 2025
97a2920
Reduce nanoTime usage; extend tck tests
DougLea Jan 23, 2025
f9aa135
Simplify scheduler state tracking
DougLea Jan 23, 2025
798fe64
Refactor delay scheduler pool submissions
DougLea Jan 24, 2025
d083e91
improve removal cost balance
DougLea Jan 25, 2025
1a7f77c
Ensure negative nanotime offset
DougLea Jan 26, 2025
f832335
Merge branch 'openjdk:master' into JDK-8319447
DougLea Feb 2, 2025
3da4fd7
Solidify design; add documentation
DougLea Feb 2, 2025
49b1699
Separate out DelayScheduler.java
DougLea Feb 3, 2025
229da14
Rework FJP-DS connections
DougLea Feb 4, 2025
9fad8d4
Deal with commonPool parallelism zero; use in other juc classes; remo…
DougLea Feb 7, 2025
2e60dc9
Refactor schedule methods
DougLea Feb 8, 2025
a0db427
Isolate screening
DougLea Feb 8, 2025
d0f4af1
Support STPE policy methods
DougLea Feb 9, 2025
a6290ab
Reduce memory accesses
DougLea Feb 9, 2025
c839299
Simplify policy methods; improve layout
DougLea Feb 10, 2025
f1394c4
Rename DelayedTask to ScheduledForkJoinTask; misc other improvements
DougLea Feb 12, 2025
0e13955
Better accommodate CompletableFuture; use 4-ary heap; add javadocs; o…
DougLea Feb 15, 2025
14a7a6f
Reduce garbage retention; use trailing padding; add tests
DougLea Feb 16, 2025
bd58f41
Merge branch 'openjdk:master' into JDK-8319447
DougLea Feb 16, 2025
93aac79
Merge remote-tracking branch 'refs/remotes/origin/JDK-8319447' into J…
DougLea Feb 16, 2025
753d0e0
Add optional SubmitWithTimeout action
DougLea Feb 17, 2025
53516e9
Misc minor improvements and renamings for clarity
DougLea Feb 19, 2025
16815cc
Address feedback
DougLea Feb 21, 2025
84eaab0
Address review comments; ensure new internal methods can't clash with…
DougLea Feb 22, 2025
c9bc41a
Standardize parameter checking
DougLea Feb 23, 2025
b40513f
Address review comments; reactivation tweak
DougLea Feb 28, 2025
0c5d22a
Reduce volatile reads
DougLea Mar 1, 2025
5c0355b
Associate probes with carriers if Virtual (no doc updates yet)
DougLea Mar 8, 2025
f670910
Merge branch 'openjdk:master' into JDK-8319447
DougLea Mar 8, 2025
6fe1a3b
Disambiguate caller-runs vs Interruptible
DougLea Mar 9, 2025
9cc670b
Use SharedSecrets for ThreadLocalRandomProbe; other tweaks
DougLea Mar 11, 2025
172a235
Reword javadoc
DougLea Mar 13, 2025
9b51b7a
Use TC_MASK in accord with https://bugs.openjdk.org/browse/JDK-833001…
DougLea Mar 14, 2025
24422e4
Match indent of naster changes
DougLea Mar 22, 2025
b552c22
Merge branch 'openjdk:master' into JDK-8319447
DougLea Mar 22, 2025
9cf0a75
Address review comments
DougLea Mar 25, 2025
4aabe6b
Merge branch 'openjdk:master' into JDK-8319447
DougLea Mar 25, 2025
3237cc7
Address review comments
DougLea Mar 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 103 additions & 154 deletions src/java.base/share/classes/java/util/concurrent/CompletableFuture.java

Large diffs are not rendered by default.

554 changes: 554 additions & 0 deletions src/java.base/share/classes/java/util/concurrent/DelayScheduler.java

Large diffs are not rendered by default.

553 changes: 497 additions & 56 deletions src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Large diffs are not rendered by default.

109 changes: 75 additions & 34 deletions src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -330,14 +330,9 @@ private void setDone() {
*/
final int trySetCancelled() {
int s;
for (;;) {
if ((s = status) < 0)
break;
if (casStatus(s, s | (DONE | ABNORMAL))) {
signalWaiters();
break;
}
}
if ((s = status) >= 0 &&
(s = getAndBitwiseOrStatus(DONE | ABNORMAL)) >= 0)
signalWaiters();
return s;
}

Expand Down Expand Up @@ -642,7 +637,7 @@ public final ForkJoinTask<V> fork() {
p = wt.pool;
}
else
q = (p = ForkJoinPool.common).externalSubmissionQueue();
q = (p = ForkJoinPool.common).externalSubmissionQueue(false);
q.push(this, p, internal);
return this;
}
Expand Down Expand Up @@ -1415,7 +1410,8 @@ public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
* @return the task
*/
public static ForkJoinTask<?> adapt(Runnable runnable) {
return new AdaptedRunnableAction(runnable);
return new AdaptedRunnableAction(
Objects.requireNonNull(runnable));
}

/**
Expand All @@ -1429,7 +1425,8 @@ public static ForkJoinTask<?> adapt(Runnable runnable) {
* @return the task
*/
public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
return new AdaptedRunnable<T>(runnable, result);
return new AdaptedRunnable<T>(
Objects.requireNonNull(runnable), result);
}

/**
Expand All @@ -1443,7 +1440,8 @@ public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
* @return the task
*/
public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
return new AdaptedCallable<T>(callable);
return new AdaptedCallable<T>(
Objects.requireNonNull(callable));
}

/**
Expand All @@ -1461,7 +1459,8 @@ public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
* @since 19
*/
public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
return new AdaptedInterruptibleCallable<T>(callable);
return new AdaptedInterruptibleCallable<T>(
Objects.requireNonNull(callable));
}

/**
Expand All @@ -1480,7 +1479,8 @@ public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> calla
* @since 22
*/
public static <T> ForkJoinTask<T> adaptInterruptible(Runnable runnable, T result) {
return new AdaptedInterruptibleRunnable<T>(runnable, result);
return new AdaptedInterruptibleRunnable<T>(
Objects.requireNonNull(runnable), result);
}

/**
Expand All @@ -1498,7 +1498,8 @@ public static <T> ForkJoinTask<T> adaptInterruptible(Runnable runnable, T result
* @since 22
*/
public static ForkJoinTask<?> adaptInterruptible(Runnable runnable) {
return new AdaptedInterruptibleRunnable<Void>(runnable, null);
return new AdaptedInterruptibleRunnable<Void>(
Objects.requireNonNull(runnable), null);
}

// Serialization support
Expand Down Expand Up @@ -1557,7 +1558,6 @@ static final class AdaptedRunnable<T> extends ForkJoinTask<T>
@SuppressWarnings("serial") // Conditionally serializable
T result;
AdaptedRunnable(Runnable runnable, T result) {
Objects.requireNonNull(runnable);
this.runnable = runnable;
this.result = result; // OK to set this even before completion
}
Expand All @@ -1579,7 +1579,6 @@ static final class AdaptedRunnableAction extends ForkJoinTask<Void>
@SuppressWarnings("serial") // Conditionally serializable
final Runnable runnable;
AdaptedRunnableAction(Runnable runnable) {
Objects.requireNonNull(runnable);
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
Expand All @@ -1602,7 +1601,6 @@ static final class AdaptedCallable<T> extends ForkJoinTask<T>
@SuppressWarnings("serial") // Conditionally serializable
T result;
AdaptedCallable(Callable<? extends T> callable) {
Objects.requireNonNull(callable);
this.callable = callable;
}
public final T getRawResult() { return result; }
Expand Down Expand Up @@ -1656,20 +1654,29 @@ public final boolean exec() {
} finally {
runner = null;
}
return postExec();
}
boolean postExec() { // cleanup and return completion status to doExec
return true;
}
final boolean interruptIfRunning(boolean enabled) {
Thread t;
if ((t = runner) == null) // return false if not running
return false;
if (enabled) {
try {
t.interrupt();
} catch (Throwable ignore) {
}
}
return true;
}
public boolean cancel(boolean mayInterruptIfRunning) {
Thread t;
if (trySetCancelled() >= 0) {
if (mayInterruptIfRunning && (t = runner) != null) {
try {
t.interrupt();
} catch (Throwable ignore) {
}
}
return true;
}
return isCancelled();
int s;
if ((s = trySetCancelled()) < 0)
return ((s & (ABNORMAL | THROWN)) == ABNORMAL);
interruptIfRunning(mayInterruptIfRunning);
return true;
}
public final void run() { quietlyInvoke(); }
Object adaptee() { return null; } // for printing and diagnostics
Expand All @@ -1691,7 +1698,6 @@ static final class AdaptedInterruptibleCallable<T> extends InterruptibleTask<T>
@SuppressWarnings("serial") // Conditionally serializable
T result;
AdaptedInterruptibleCallable(Callable<? extends T> callable) {
Objects.requireNonNull(callable);
this.callable = callable;
}
public final T getRawResult() { return result; }
Expand All @@ -1710,7 +1716,6 @@ static final class AdaptedInterruptibleRunnable<T> extends InterruptibleTask<T>
@SuppressWarnings("serial") // Conditionally serializable
final T result;
AdaptedInterruptibleRunnable(Runnable runnable, T result) {
Objects.requireNonNull(runnable);
this.runnable = runnable;
this.result = result;
}
Expand All @@ -1728,7 +1733,6 @@ static final class RunnableExecuteAction extends InterruptibleTask<Void> {
@SuppressWarnings("serial") // Conditionally serializable
final Runnable runnable;
RunnableExecuteAction(Runnable runnable) {
Objects.requireNonNull(runnable);
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
Expand Down Expand Up @@ -1794,9 +1798,12 @@ final T invokeAny(Collection<? extends Callable<T>> tasks,
throw new NullPointerException();
InvokeAnyTask<T> t = null; // list of submitted tasks
try {
for (Callable<T> c : tasks)
for (Callable<T> c : tasks) {
if (c == null)
throw new NullPointerException();
pool.execute((ForkJoinTask<?>)
(t = new InvokeAnyTask<T>(c, this, t)));
}
return timed ? get(nanos, TimeUnit.NANOSECONDS) : get();
} finally {
for (; t != null; t = t.pred)
Expand All @@ -1823,7 +1830,6 @@ static final class InvokeAnyTask<T> extends InterruptibleTask<Void> {
final InvokeAnyTask<T> pred; // to traverse on cancellation
InvokeAnyTask(Callable<T> callable, InvokeAnyRoot<T> root,
InvokeAnyTask<T> pred) {
Objects.requireNonNull(callable);
this.callable = callable;
this.root = root;
this.pred = pred;
Expand Down Expand Up @@ -1858,4 +1864,39 @@ final void onRootCompletion() {
public final void setRawResult(Void v) { }
final Object adaptee() { return callable; }
}

/**
* Adapter for Callable-based interruptible tasks with timeout actions.
*/
@SuppressWarnings("serial") // Conditionally serializable
static final class CallableWithTimeout<T> extends InterruptibleTask<T> {
Callable<? extends T> callable; // nulled out after use
ForkJoinTask<?> timeoutAction;
T result;
CallableWithTimeout(Callable<? extends T> callable,
ForkJoinTask<?> timeoutAction) {
this.callable = callable;
this.timeoutAction = timeoutAction;
}
public final T getRawResult() { return result; }
public final void setRawResult(T v) { result = v; }
final Object adaptee() { return callable; }
final T compute() throws Exception {
Callable<? extends T> c;
return ((c = callable) != null) ? c.call() : null;
}
final boolean postExec() { // cancel timeout action
ForkJoinTask<?> t;
callable = null;
if ((t = timeoutAction) != null) {
timeoutAction = null;
try {
t.cancel(false);
} catch (Error | RuntimeException ex) {
}
}
return true;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -206,22 +206,6 @@ static final int roundCapacity(int cap) {
(n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
}

// default Executor setup; nearly the same as CompletableFuture

/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor ASYNC_POOL =
(ForkJoinPool.getCommonPoolParallelism() > 1) ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
private static final class ThreadPerTaskExecutor implements Executor {
ThreadPerTaskExecutor() {} // prevent access constructor creation
public void execute(Runnable r) { new Thread(r).start(); }
}

/**
* Clients (BufferedSubscriptions) are maintained in a linked list
* (via their "next" fields). This works well for publish loops.
Expand Down Expand Up @@ -316,7 +300,7 @@ public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
* Flow.Subscriber#onNext(Object) onNext}.
*/
public SubmissionPublisher() {
this(ASYNC_POOL, Flow.defaultBufferSize(), null);
this(ForkJoinPool.asyncCommonPool(), Flow.defaultBufferSize(), null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,25 @@
*/

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;

class CompletableFutureOrTimeoutExceptionallyTest {
static final BlockingQueue<Runnable> delayerQueue;
static {
try {
var delayerClass = Class.forName("java.util.concurrent.CompletableFuture$Delayer",
true,
CompletableFuture.class.getClassLoader());
var delayerField = delayerClass.getDeclaredField("delayer");
delayerField.setAccessible(true);
delayerQueue = ((ScheduledThreadPoolExecutor)delayerField.get(null)).getQueue();
} catch (Throwable t) {
throw new ExceptionInInitializerError(t);
}
}

// updated February 2025 to adapt to CompletableFuture DelayScheduler changes
/**
* Test that orTimeout task is cancelled if the CompletableFuture is completed Exceptionally
*/
@Test
void testOrTimeoutWithCompleteExceptionallyDoesNotLeak() throws InterruptedException {
assertTrue(delayerQueue.peek() == null);
ForkJoinPool delayer = ForkJoinPool.commonPool();
assertEquals(delayer.getDelayedTaskCount(), 0);
var future = new CompletableFuture<>().orTimeout(12, TimeUnit.HOURS);
assertTrue(delayerQueue.peek() != null);
future.completeExceptionally(new RuntimeException("This is fine"));
while (delayerQueue.peek() != null) {
while (delayer.getDelayedTaskCount() != 0) {
Thread.sleep(100);
};
}
Expand All @@ -72,12 +58,13 @@ void testOrTimeoutWithCompleteExceptionallyDoesNotLeak() throws InterruptedExcep
*/
@Test
void testCompleteOnTimeoutWithCompleteExceptionallyDoesNotLeak() throws InterruptedException {
assertTrue(delayerQueue.peek() == null);
ForkJoinPool delayer = ForkJoinPool.commonPool();
assertEquals(delayer.getDelayedTaskCount(), 0);
var future = new CompletableFuture<>().completeOnTimeout(null, 12, TimeUnit.HOURS);
assertTrue(delayerQueue.peek() != null);
future.completeExceptionally(new RuntimeException("This is fine"));
while (delayerQueue.peek() != null) {
while (delayer.getDelayedTaskCount() != 0) {
Thread.sleep(100);
};
}

}
10 changes: 2 additions & 8 deletions test/jdk/java/util/concurrent/tck/CompletableFutureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,6 @@ public void execute(Runnable r) {
}
}

static final boolean defaultExecutorIsCommonPool
= ForkJoinPool.getCommonPoolParallelism() > 1;

/**
* Permits the testing of parallel code for the 3 different
Expand Down Expand Up @@ -750,8 +748,7 @@ public <U> CompletableFuture<U> supplyAsync(Supplier<U> a) {
},
ASYNC {
public void checkExecutionMode() {
mustEqual(defaultExecutorIsCommonPool,
(ForkJoinPool.commonPool() == ForkJoinTask.getPool()));
mustEqual(ForkJoinPool.commonPool(), ForkJoinTask.getPool());
}
public CompletableFuture<Void> runAsync(Runnable a) {
return CompletableFuture.runAsync(a);
Expand Down Expand Up @@ -3794,10 +3791,7 @@ public void testDefaultExecutor() {
CompletableFuture<Item> f = new CompletableFuture<>();
Executor e = f.defaultExecutor();
Executor c = ForkJoinPool.commonPool();
if (ForkJoinPool.getCommonPoolParallelism() > 1)
assertSame(e, c);
else
assertNotSame(e, c);
assertSame(e, c);
}

/**
Expand Down
Loading