Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Helper methods to schedule tasks (non-)interruptibly on an #2772

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/rx/internal/schedulers/EventLoopsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public Worker createWorker() {
*/
public Subscription scheduleDirect(Action0 action) {
PoolWorker pw = pool.getEventLoop();
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS);
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS, false);
}

private static class EventLoopWorker extends Scheduler.Worker {
Expand Down Expand Up @@ -124,7 +124,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
return Subscriptions.unsubscribed();
}

ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit, false);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
Expand Down
17 changes: 9 additions & 8 deletions src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,20 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
if (isUnsubscribed) {
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit);
return scheduleActual(action, delayTime, unit, true);
}

/**
* @warn javadoc missing
* @param action
* @param delayTime
* @param unit
* @return
* Performs the actual scheduling of a potentially delayed task and assigns the
* future to the ScheduledAction it returs.
* @param action the action to schedule
* @param delayTime the scheduling delay if positive
* @param unit the scheduling delay's time unit
* @return the ScheduledAction representing the task
*/
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, boolean interruptOnUnsubscribe) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
ScheduledAction run = new ScheduledAction(decoratedAction, interruptOnUnsubscribe);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
Expand Down
43 changes: 32 additions & 11 deletions src/main/java/rx/internal/schedulers/ScheduledAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
package rx.internal.schedulers;

import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.*;

import rx.Subscription;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.internal.util.SubscriptionList;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.CompositeSubscription;

Expand All @@ -32,12 +32,20 @@
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
/** */
private static final long serialVersionUID = -3962399486978279857L;
final CompositeSubscription cancel;
final SubscriptionList cancel;
final Action0 action;
volatile int interruptOnUnsubscribe;
static final AtomicIntegerFieldUpdater<ScheduledAction> INTERRUPT_ON_UNSUBSCRIBE
= AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "interruptOnUnsubscribe");

public ScheduledAction(Action0 action) {
this(action, true);
}

public ScheduledAction(Action0 action, boolean interruptOnUnsubscribe) {
this.action = action;
this.cancel = new CompositeSubscription();
this.cancel = new SubscriptionList();
this.interruptOnUnsubscribe = interruptOnUnsubscribe ? 1 : 0;
}

@Override
Expand All @@ -61,16 +69,29 @@ public void run() {
}
}

/**
* Sets the flag to indicate the underlying Future task should be interrupted on unsubscription or not.
* @param interrupt the new interruptible status
*/
public void setInterruptOnUnsubscribe(boolean interrupt) {
INTERRUPT_ON_UNSUBSCRIBE.lazySet(this, interrupt ? 1 : 0);
}
/**
* Returns {@code true} if the underlying Future task will be interrupted on unsubscription.
* @return the current interruptible status
*/
public boolean isInterruptOnUnsubscribe() {
return interruptOnUnsubscribe != 0;
}

@Override
public boolean isUnsubscribed() {
return cancel.isUnsubscribed();
}

@Override
public void unsubscribe() {
if (!cancel.isUnsubscribed()) {
cancel.unsubscribe();
}
cancel.unsubscribe();
}

/**
Expand All @@ -89,7 +110,7 @@ public void add(Subscription s) {
* @param f the future to add
*/
public void add(final Future<?> f) {
cancel.add(new FutureCompleter(f));
add(new FutureCompleter(f));
}

/**
Expand All @@ -100,7 +121,7 @@ public void add(final Future<?> f) {
* the parent {@code CompositeSubscription} to add
*/
public void addParent(CompositeSubscription parent) {
cancel.add(new Remover(this, parent));
add(new Remover(this, parent));
}

/**
Expand All @@ -119,7 +140,7 @@ private FutureCompleter(Future<?> f) {
@Override
public void unsubscribe() {
if (ScheduledAction.this.get() != Thread.currentThread()) {
f.cancel(true);
f.cancel(interruptOnUnsubscribe != 0);
} else {
f.cancel(false);
}
Expand Down Expand Up @@ -155,4 +176,4 @@ public void unsubscribe() {
}

}
}
}
2 changes: 1 addition & 1 deletion src/main/java/rx/schedulers/CachedThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
return Subscriptions.unsubscribed();
}

ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit, true);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
Expand Down
62 changes: 57 additions & 5 deletions src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
*/
package rx.schedulers;

import rx.Scheduler;
import rx.internal.schedulers.EventLoopsScheduler;
import rx.plugins.RxJavaPlugins;
import java.util.concurrent.*;

import java.util.concurrent.Executor;
import rx.*;
import rx.annotations.Experimental;
import rx.functions.Action0;
import rx.internal.schedulers.*;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.CompositeSubscription;

/**
* Static factory methods for creating Schedulers.
Expand Down Expand Up @@ -137,4 +140,53 @@ public static TestScheduler test() {
public static Scheduler from(Executor executor) {
return new ExecutorScheduler(executor);
}
}
/**
* Submit an Action0 to the specified executor service with the option to interrupt the task
* on unsubscription and add it to a parent composite subscription.
* @param executor the target executor service
* @param action the action to execute
* @param parent if not {@code null} the subscription representing the action is added to this composite with logic to remove it
* once the action completes or is unsubscribed.
* @param interruptOnUnsubscribe if {@code false}, unsubscribing the task will not interrupt the task if it is running
* @return the Subscription representing the scheduled action which is also added to the {@code parent} composite
*/
@Experimental
public static Subscription submitTo(ExecutorService executor, Action0 action, CompositeSubscription parent, boolean interruptOnUnsubscribe) {
ScheduledAction sa = new ScheduledAction(action, interruptOnUnsubscribe);

if (parent != null) {
parent.add(sa);
sa.addParent(parent);
}

Future<?> f = executor.submit(sa);
sa.add(f);

return sa;
}
/**
* Submit an Action0 to the specified executor service with the given delay and the option to interrupt the task
* on unsubscription and add it to a parent composite subscription.
* @param executor the target executor service
* @param action the action to execute
* @param delay the delay value
* @param unit the time unit of the delay value
* @param parent if not {@code null} the subscription representing the action is added to this composite with logic to remove it
* once the action completes or is unsubscribed.
* @param interruptOnUnsubscribe if {@code false}, unsubscribing the task will not interrupt the task if it is running
* @return the Subscription representing the scheduled action which is also added to the {@code parent} composite
*/
@Experimental
public static Subscription submitTo(ScheduledExecutorService executor, Action0 action, long delay, TimeUnit unit, CompositeSubscription parent, boolean interruptOnUnsubscribe) {
ScheduledAction sa = new ScheduledAction(action, interruptOnUnsubscribe);

if (parent != null) {
parent.add(sa);
sa.addParent(parent);
}

Future<?> f = executor.schedule(sa, delay, unit);
sa.add(f);

return sa;
}}
Loading