diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java index f965891cc..28ffdcda3 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java @@ -880,7 +880,7 @@ public void testRequestVariableLifecycle2() throws Exception { // kick off work (simulating a single request with multiple threads) for (int t = 0; t < 5; t++) { - Thread th = new Thread(new HystrixContextRunnable(new Runnable() { + Thread th = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { @Override public void run() { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java index 64d7fa3ca..81814dedc 100755 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java @@ -58,6 +58,7 @@ import rx.subjects.ReplaySubject; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; +import rx.util.functions.Action1; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -1006,6 +1007,24 @@ public Subscription onSubscribe(final Observer observer) { // TODO better yet, get TimeoutObservable part of Rx final SafeObservableSubscription s = new SafeObservableSubscription(); + /* + * Define the action to perform on timeout outside of the TimerListener to it can capture the HystrixRequestContext + * of the calling thread which doesn't exist on the Timer thread. + */ + final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() { + + @Override + public void run() { + try { + R v = originalCommand.getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException()); + observer.onNext(v); + observer.onCompleted(); + } catch (HystrixRuntimeException re) { + observer.onError(re); + } + } + }); + TimerListener listener = new TimerListener() { @Override @@ -1021,13 +1040,7 @@ public void tick() { // we record execution time because we are returning before originalCommand.recordTotalExecutionTime(originalCommand.invocationStartTime); - try { - R v = originalCommand.getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException()); - observer.onNext(v); - observer.onCompleted(); - } catch (HystrixRuntimeException re) { - observer.onError(re); - } + timeoutRunnable.run(); } s.unsubscribe(); @@ -1146,7 +1159,7 @@ private Subscription subscribeWithThreadIsolation(final Observer obse } // wrap the synchronous execute() method in a Callable and execute in the threadpool - final Future f = threadPool.getExecutor().submit(concurrencyStrategy.wrapCallable(new HystrixContextCallable(new Callable() { + final Future f = threadPool.getExecutor().submit(new HystrixContextCallable(concurrencyStrategy, new Callable() { @Override public R call() throws Exception { @@ -1215,7 +1228,7 @@ private void preTerminationWork(boolean recordDuration) { } } - }))); + })); return new Subscription() { @@ -3818,7 +3831,7 @@ public void testExecutionSemaphoreWithQueue() { final TryableSemaphore semaphore = new TryableSemaphore(HystrixProperty.Factory.asProperty(1)); - Runnable r = new HystrixContextRunnable(new Runnable() { + Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { @Override public void run() { @@ -3890,7 +3903,7 @@ public void testExecutionSemaphoreWithExecution() { final TryableSemaphore semaphore = new TryableSemaphore(HystrixProperty.Factory.asProperty(1)); - Runnable r = new HystrixContextRunnable(new Runnable() { + Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { @Override public void run() { @@ -3953,7 +3966,7 @@ public void testRejectedExecutionSemaphoreWithFallback() { final AtomicBoolean exceptionReceived = new AtomicBoolean(); - Runnable r = new HystrixContextRunnable(new Runnable() { + Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { @Override public void run() { @@ -4026,7 +4039,7 @@ public void testSemaphorePermitsInUse() { // used to signal that all command can finish final CountDownLatch sharedLatch = new CountDownLatch(1); - final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(new Runnable() { + final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { public void run() { try { new LatchedSemaphoreCommand(circuitBreaker, sharedSemaphore, startLatch, sharedLatch).execute(); @@ -4054,7 +4067,7 @@ public void run() { // tracks failures to obtain semaphores final AtomicInteger failureCount = new AtomicInteger(); - final Thread isolatedThread = new Thread(new HystrixContextRunnable(new Runnable() { + final Thread isolatedThread = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { public void run() { try { new LatchedSemaphoreCommand(circuitBreaker, isolatedSemaphore, startLatch, isolatedLatch).execute(); @@ -6133,6 +6146,64 @@ protected String getFallback() { } + /** + * See https://github.com/Netflix/Hystrix/issues/212 + */ + @Test + public void testObservableTimeoutNoFallbackThreadContext() { + final AtomicReference onErrorThread = new AtomicReference(); + final AtomicBoolean isRequestContextInitialized = new AtomicBoolean(); + TestHystrixCommand command = new TestCommandWithTimeout(50, TestCommandWithTimeout.FALLBACK_NOT_IMPLEMENTED); + try { + command.toObservable().doOnError(new Action1() { + + @Override + public void call(Throwable t1) { + System.out.println("onError: " + t1); + System.out.println("onError Thread: " + Thread.currentThread()); + System.out.println("ThreadContext in onError: " + HystrixRequestContext.isCurrentThreadInitialized()); + onErrorThread.set(Thread.currentThread()); + isRequestContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized()); + } + + }).toBlockingObservable().single(); + throw new RuntimeException("expected error to be thrown"); + } catch (Throwable e) { + assertTrue(isRequestContextInitialized.get()); + assertTrue(onErrorThread.get().getName().startsWith("RxComputationThreadPool")); + + if (e instanceof HystrixRuntimeException) { + HystrixRuntimeException de = (HystrixRuntimeException) e; + assertNotNull(de.getFallbackException()); + assertTrue(de.getFallbackException() instanceof UnsupportedOperationException); + assertNotNull(de.getImplementingClass()); + assertNotNull(de.getCause()); + assertTrue(de.getCause() instanceof TimeoutException); + } else { + fail("the exception should be ExecutionException with cause as HystrixRuntimeException"); + } + } + + assertTrue(command.getExecutionTimeInMilliseconds() > -1); + assertTrue(command.isResponseTimedOut()); + + assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS)); + assertEquals(1, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.EXCEPTION_THROWN)); + assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FAILURE)); + assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_REJECTION)); + assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_FAILURE)); + assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_SUCCESS)); + assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.SEMAPHORE_REJECTED)); + assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.SHORT_CIRCUITED)); + assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED)); + assertEquals(1, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT)); + assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE)); + + assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); + + assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + } + /* ******************************************************************************** */ /* ******************************************************************************** */ /* private HystrixCommand class implementations for unit testing */ diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapser.java index f5ecf70ed..b7cf5e502 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapser.java @@ -121,7 +121,7 @@ private class CollapsedTask implements TimerListener { CollapsedTask() { // this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread) // so we create the callable now where we can capture the thread context - callableWithContextOfParent = concurrencyStrategy.wrapCallable(new HystrixContextCallable(new Callable() { + callableWithContextOfParent = new HystrixContextCallable(concurrencyStrategy, new Callable() { // the wrapCallable call allows a strategy to capture thread-context if desired @Override @@ -144,7 +144,7 @@ public Void call() throws Exception { return null; } - })); + }); } @Override diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/HystrixPlugins.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/HystrixPlugins.java index 385f765c3..4e9241f4c 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/HystrixPlugins.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/HystrixPlugins.java @@ -17,13 +17,20 @@ import static org.junit.Assert.*; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Test; +import rx.util.functions.Action1; + +import com.netflix.hystrix.Hystrix; +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier; import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifierDefault; import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook; @@ -418,6 +425,76 @@ public static class HystrixPropertiesStrategyTestImpl extends HystrixPropertiesS private static String getFullClassNameForTestClass(Class cls) { return HystrixPlugins.class.getPackage().getName() + "." + HystrixPlugins.class.getSimpleName() + "$UnitTest$" + cls.getSimpleName(); } + + + private static final ThreadLocal testRequestIdThreadLocal = new ThreadLocal(); + + public static class DummyCommand extends HystrixCommand { + + public DummyCommand() { + super(HystrixCommandGroupKey.Factory.asKey("Dummy")); + } + + @Override + protected Void run() throws Exception { + System.out.println("requestId (run) = " + testRequestIdThreadLocal.get()); + Thread.sleep(2000); + return null; + } + } + + @Test + public void testRequestContextViaPluginInTimeout() { + HystrixPlugins.getInstance().registerConcurrencyStrategy(new HystrixConcurrencyStrategy() { + @Override + public Callable wrapCallable(final Callable callable) { + return new RequestIdCallable(callable); + } + }); + + HystrixRequestContext context = HystrixRequestContext.initializeContext(); + + testRequestIdThreadLocal.set("foobar"); + final AtomicReference valueInTimeout = new AtomicReference(); + + new DummyCommand().toObservable() + .doOnError(new Action1() { + @Override + public void call(Throwable throwable) { + System.out.println("initialized = " + HystrixRequestContext.isCurrentThreadInitialized()); + System.out.println("requestId (timeout) = " + testRequestIdThreadLocal.get()); + valueInTimeout.set(testRequestIdThreadLocal.get()); + } + }) + .materialize() + .toBlockingObservable().single(); + + context.shutdown(); + Hystrix.reset(); + + assertEquals("foobar", valueInTimeout.get()); + } + + private static class RequestIdCallable implements Callable { + private final Callable callable; + private final String requestId; + + public RequestIdCallable(Callable callable) { + this.callable = callable; + this.requestId = testRequestIdThreadLocal.get(); + } + + @Override + public T call() throws Exception { + String original = testRequestIdThreadLocal.get(); + testRequestIdThreadLocal.set(requestId); + try { + return callable.call(); + } finally { + testRequestIdThreadLocal.set(original); + } + } + } } } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java index 7ea72792d..ee8b3d175 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java @@ -32,6 +32,7 @@ import rx.util.functions.Func1; import com.netflix.config.ConfigurationManager; +import com.netflix.hystrix.Hystrix; import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixRequestLog; @@ -218,6 +219,7 @@ protected String run() throws Exception { } } + } } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextCallable.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextCallable.java index 62a596f83..eefb31df0 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextCallable.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextCallable.java @@ -30,8 +30,8 @@ public class HystrixContextCallable implements Callable { private final Callable actual; private final HystrixRequestContext parentThreadState; - public HystrixContextCallable(Callable actual) { - this.actual = actual; + public HystrixContextCallable(HystrixConcurrencyStrategy concurrencyStrategy, Callable actual) { + this.actual = concurrencyStrategy.wrapCallable(actual); this.parentThreadState = HystrixRequestContext.getContextForCurrentThread(); } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextRunnable.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextRunnable.java index d73c8cafe..5f9cde7d3 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextRunnable.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextRunnable.java @@ -15,6 +15,8 @@ */ package com.netflix.hystrix.strategy.concurrency; +import java.util.concurrent.Callable; + /** * Wrapper around {@link Runnable} that manages the {@link HystrixRequestContext} initialization and cleanup for the execution of the {@link Runnable} * @@ -22,11 +24,19 @@ */ public class HystrixContextRunnable implements Runnable { - private final Runnable actual; + private final Callable actual; private final HystrixRequestContext parentThreadState; - public HystrixContextRunnable(Runnable actual) { - this.actual = actual; + public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) { + this.actual = concurrencyStrategy.wrapCallable(new Callable() { + + @Override + public Void call() throws Exception { + actual.run(); + return null; + } + + }); this.parentThreadState = HystrixRequestContext.getContextForCurrentThread(); } @@ -37,7 +47,11 @@ public void run() { // set the state of this thread to that of its parent HystrixRequestContext.setContextOnCurrentThread(parentThreadState); // execute actual Callable with the state of the parent - actual.run(); + try { + actual.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } } finally { // restore this thread back to its original state HystrixRequestContext.setContextOnCurrentThread(existingState); diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java index aa03fc1bb..4fadee133 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java @@ -15,12 +15,10 @@ */ package com.netflix.hystrix.strategy.concurrency; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import rx.Scheduler; import rx.Subscription; -import rx.subscriptions.MultipleAssignmentSubscription; import rx.util.functions.Func2; /**