diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java index 64aca814f..e4f9a01df 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java @@ -15,16 +15,12 @@ */ package com.netflix.hystrix.strategy.concurrency; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.*; -import rx.Scheduler; -import rx.Subscription; +import rx.*; import rx.functions.Action0; -import rx.subscriptions.BooleanSubscription; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; +import rx.internal.schedulers.ScheduledAction; +import rx.subscriptions.*; import com.netflix.hystrix.HystrixThreadPool; import com.netflix.hystrix.strategy.HystrixPlugins; @@ -64,7 +60,6 @@ public Worker createWorker() { private class HystrixContextSchedulerWorker extends Worker { - private BooleanSubscription s = new BooleanSubscription(); private final Worker worker; private HystrixContextSchedulerWorker(Worker actualWorker) { @@ -73,12 +68,12 @@ private HystrixContextSchedulerWorker(Worker actualWorker) { @Override public void unsubscribe() { - s.unsubscribe(); + worker.unsubscribe(); } @Override public boolean isUnsubscribed() { - return s.isUnsubscribed(); + return worker.isUnsubscribed(); } @Override @@ -150,32 +145,20 @@ public boolean isUnsubscribed() { public Subscription schedule(final Action0 action) { if (subscription.isUnsubscribed()) { // don't schedule, we are unsubscribed - return Subscriptions.empty(); + return Subscriptions.unsubscribed(); } - - final AtomicReference sf = new AtomicReference<>(); - Subscription s = Subscriptions.from(threadPool.getExecutor().submit(new Runnable() { - - @Override - public void run() { - try { - if (subscription.isUnsubscribed()) { - return; - } - action.call(); - } finally { - // remove the subscription now that we're completed - Subscription s = sf.get(); - if (s != null) { - subscription.remove(s); - } - } - } - })); - - sf.set(s); - subscription.add(s); - return s; + + // This is internal RxJava API but it is too useful. + ScheduledAction sa = new ScheduledAction(action); + + subscription.add(sa); + sa.addParent(subscription); + + Future f = threadPool.getExecutor().submit(sa); + + sa.add(f); + + return sa; } @Override diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java index 1eb1affdd..47977a2e0 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java @@ -8,14 +8,19 @@ import com.netflix.hystrix.HystrixThreadPool.Factory; import com.netflix.hystrix.strategy.HystrixPlugins; +import com.netflix.hystrix.strategy.concurrency.*; import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher; import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory; import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherThreadPool; + import org.junit.Before; import org.junit.Test; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import rx.Scheduler; +import rx.functions.Action0; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; public class HystrixThreadPoolTest { @Before @@ -102,4 +107,49 @@ public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(Hystri //Now the HystrixThreadPool ALWAYS has the same reference to the ThreadPoolExecutor so that it no longer matters which //wins to be inserted into the HystrixThreadPool.Factory.threadPools cache. } + @Test(timeout = 2500) + public void testUnsubscribeHystrixThreadPool() throws InterruptedException { + // methods are package-private so can't test it somewhere else + HystrixThreadPool pool = Factory.getInstance(HystrixThreadPoolKey.Factory.asKey("threadPoolFactoryTest"), + HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder()); + + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch end = new CountDownLatch(1); + + HystrixContextScheduler hcs = new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), pool); + + Scheduler.Worker w = hcs.createWorker(); + + try { + w.schedule(new Action0() { + @Override + public void call() { + start.countDown(); + try { + try { + Thread.sleep(5000); + } catch (InterruptedException ex) { + interrupted.set(true); + } + } finally { + end.countDown(); + } + } + }); + + start.await(); + + w.unsubscribe(); + + end.await(); + + Factory.shutdown(); + + assertTrue(interrupted.get()); + } finally { + w.unsubscribe(); + } + } + } diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/strategy/concurrency/HystrixContextSchedulerTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/strategy/concurrency/HystrixContextSchedulerTest.java new file mode 100644 index 000000000..5ba46f163 --- /dev/null +++ b/hystrix-core/src/test/java/com/netflix/hystrix/strategy/concurrency/HystrixContextSchedulerTest.java @@ -0,0 +1,54 @@ +package com.netflix.hystrix.strategy.concurrency; + +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +import rx.Scheduler; +import rx.functions.Action0; +import rx.schedulers.Schedulers; + +public class HystrixContextSchedulerTest { + + @Test(timeout = 2500) + public void testUnsubscribeWrappedScheduler() throws InterruptedException { + Scheduler s = Schedulers.newThread(); + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch end = new CountDownLatch(1); + + HystrixContextScheduler hcs = new HystrixContextScheduler(s); + + Scheduler.Worker w = hcs.createWorker(); + try { + w.schedule(new Action0() { + @Override + public void call() { + start.countDown(); + try { + try { + Thread.sleep(5000); + } catch (InterruptedException ex) { + interrupted.set(true); + } + } finally { + end.countDown(); + } + } + }); + + start.await(); + + w.unsubscribe(); + + end.await(); + + assertTrue(interrupted.get()); + } finally { + w.unsubscribe(); + } + } +}