Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BugFix: Timeout does not propagate request context #213

Merged
merged 2 commits into from
Mar 5, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ public void testRequestVariableLifecycle2() throws Exception {

// kick off work (simulating a single request with multiple threads)
for (int t = 0; t < 5; t++) {
Thread th = new Thread(new HystrixContextRunnable(new Runnable() {
Thread th = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

@Override
public void run() {
Expand Down
99 changes: 85 additions & 14 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import rx.subjects.ReplaySubject;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

Expand Down Expand Up @@ -1006,6 +1007,24 @@ public Subscription onSubscribe(final Observer<? super R> observer) {
// TODO better yet, get TimeoutObservable part of Rx
final SafeObservableSubscription s = new SafeObservableSubscription();

/*
* Define the action to perform on timeout outside of the TimerListener to it can capture the HystrixRequestContext
* of the calling thread which doesn't exist on the Timer thread.
*/
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {

@Override
public void run() {
try {
R v = originalCommand.getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
observer.onNext(v);
observer.onCompleted();
} catch (HystrixRuntimeException re) {
observer.onError(re);
}
}
});

TimerListener listener = new TimerListener() {

@Override
Expand All @@ -1021,13 +1040,7 @@ public void tick() {
// we record execution time because we are returning before
originalCommand.recordTotalExecutionTime(originalCommand.invocationStartTime);

try {
R v = originalCommand.getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
observer.onNext(v);
observer.onCompleted();
} catch (HystrixRuntimeException re) {
observer.onError(re);
}
timeoutRunnable.run();
}

s.unsubscribe();
Expand Down Expand Up @@ -1146,7 +1159,7 @@ private Subscription subscribeWithThreadIsolation(final Observer<? super R> obse
}

// wrap the synchronous execute() method in a Callable and execute in the threadpool
final Future<R> f = threadPool.getExecutor().submit(concurrencyStrategy.wrapCallable(new HystrixContextCallable<R>(new Callable<R>() {
final Future<R> f = threadPool.getExecutor().submit(new HystrixContextCallable<R>(concurrencyStrategy, new Callable<R>() {

@Override
public R call() throws Exception {
Expand Down Expand Up @@ -1215,7 +1228,7 @@ private void preTerminationWork(boolean recordDuration) {
}
}

})));
}));

return new Subscription() {

Expand Down Expand Up @@ -3818,7 +3831,7 @@ public void testExecutionSemaphoreWithQueue() {
final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));

Runnable r = new HystrixContextRunnable(new Runnable() {
Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

@Override
public void run() {
Expand Down Expand Up @@ -3890,7 +3903,7 @@ public void testExecutionSemaphoreWithExecution() {
final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));

Runnable r = new HystrixContextRunnable(new Runnable() {
Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

@Override
public void run() {
Expand Down Expand Up @@ -3953,7 +3966,7 @@ public void testRejectedExecutionSemaphoreWithFallback() {

final AtomicBoolean exceptionReceived = new AtomicBoolean();

Runnable r = new HystrixContextRunnable(new Runnable() {
Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

@Override
public void run() {
Expand Down Expand Up @@ -4026,7 +4039,7 @@ public void testSemaphorePermitsInUse() {
// used to signal that all command can finish
final CountDownLatch sharedLatch = new CountDownLatch(1);

final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(new Runnable() {
final Runnable sharedSemaphoreRunnable = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {
public void run() {
try {
new LatchedSemaphoreCommand(circuitBreaker, sharedSemaphore, startLatch, sharedLatch).execute();
Expand Down Expand Up @@ -4054,7 +4067,7 @@ public void run() {
// tracks failures to obtain semaphores
final AtomicInteger failureCount = new AtomicInteger();

final Thread isolatedThread = new Thread(new HystrixContextRunnable(new Runnable() {
final Thread isolatedThread = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {
public void run() {
try {
new LatchedSemaphoreCommand(circuitBreaker, isolatedSemaphore, startLatch, isolatedLatch).execute();
Expand Down Expand Up @@ -6133,6 +6146,64 @@ protected String getFallback() {

}

/**
* See https://github.com/Netflix/Hystrix/issues/212
*/
@Test
public void testObservableTimeoutNoFallbackThreadContext() {
final AtomicReference<Thread> onErrorThread = new AtomicReference<Thread>();
final AtomicBoolean isRequestContextInitialized = new AtomicBoolean();
TestHystrixCommand<Boolean> command = new TestCommandWithTimeout(50, TestCommandWithTimeout.FALLBACK_NOT_IMPLEMENTED);
try {
command.toObservable().doOnError(new Action1<Throwable>() {

@Override
public void call(Throwable t1) {
System.out.println("onError: " + t1);
System.out.println("onError Thread: " + Thread.currentThread());
System.out.println("ThreadContext in onError: " + HystrixRequestContext.isCurrentThreadInitialized());
onErrorThread.set(Thread.currentThread());
isRequestContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
}

}).toBlockingObservable().single();
throw new RuntimeException("expected error to be thrown");
} catch (Throwable e) {
assertTrue(isRequestContextInitialized.get());
assertTrue(onErrorThread.get().getName().startsWith("RxComputationThreadPool"));

if (e instanceof HystrixRuntimeException) {
HystrixRuntimeException de = (HystrixRuntimeException) e;
assertNotNull(de.getFallbackException());
assertTrue(de.getFallbackException() instanceof UnsupportedOperationException);
assertNotNull(de.getImplementingClass());
assertNotNull(de.getCause());
assertTrue(de.getCause() instanceof TimeoutException);
} else {
fail("the exception should be ExecutionException with cause as HystrixRuntimeException");
}
}

assertTrue(command.getExecutionTimeInMilliseconds() > -1);
assertTrue(command.isResponseTimedOut());

assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS));
assertEquals(1, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.EXCEPTION_THROWN));
assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FAILURE));
assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_REJECTION));
assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_FAILURE));
assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_SUCCESS));
assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.SEMAPHORE_REJECTED));
assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.SHORT_CIRCUITED));
assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED));
assertEquals(1, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT));
assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE));

assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage());

assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
}

/* ******************************************************************************** */
/* ******************************************************************************** */
/* private HystrixCommand class implementations for unit testing */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private class CollapsedTask implements TimerListener {
CollapsedTask() {
// this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread)
// so we create the callable now where we can capture the thread context
callableWithContextOfParent = concurrencyStrategy.wrapCallable(new HystrixContextCallable<Void>(new Callable<Void>() {
callableWithContextOfParent = new HystrixContextCallable<Void>(concurrencyStrategy, new Callable<Void>() {
// the wrapCallable call allows a strategy to capture thread-context if desired

@Override
Expand All @@ -144,7 +144,7 @@ public Void call() throws Exception {
return null;
}

}));
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@

import static org.junit.Assert.*;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.After;
import org.junit.Test;

import rx.util.functions.Action1;

import com.netflix.hystrix.Hystrix;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifierDefault;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
Expand Down Expand Up @@ -418,6 +425,76 @@ public static class HystrixPropertiesStrategyTestImpl extends HystrixPropertiesS
private static String getFullClassNameForTestClass(Class<?> cls) {
return HystrixPlugins.class.getPackage().getName() + "." + HystrixPlugins.class.getSimpleName() + "$UnitTest$" + cls.getSimpleName();
}


private static final ThreadLocal<String> testRequestIdThreadLocal = new ThreadLocal<String>();

public static class DummyCommand extends HystrixCommand<Void> {

public DummyCommand() {
super(HystrixCommandGroupKey.Factory.asKey("Dummy"));
}

@Override
protected Void run() throws Exception {
System.out.println("requestId (run) = " + testRequestIdThreadLocal.get());
Thread.sleep(2000);
return null;
}
}

@Test
public void testRequestContextViaPluginInTimeout() {
HystrixPlugins.getInstance().registerConcurrencyStrategy(new HystrixConcurrencyStrategy() {
@Override
public <T> Callable<T> wrapCallable(final Callable<T> callable) {
return new RequestIdCallable<T>(callable);
}
});

HystrixRequestContext context = HystrixRequestContext.initializeContext();

testRequestIdThreadLocal.set("foobar");
final AtomicReference<String> valueInTimeout = new AtomicReference<String>();

new DummyCommand().toObservable()
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("initialized = " + HystrixRequestContext.isCurrentThreadInitialized());
System.out.println("requestId (timeout) = " + testRequestIdThreadLocal.get());
valueInTimeout.set(testRequestIdThreadLocal.get());
}
})
.materialize()
.toBlockingObservable().single();

context.shutdown();
Hystrix.reset();

assertEquals("foobar", valueInTimeout.get());
}

private static class RequestIdCallable<T> implements Callable<T> {
private final Callable<T> callable;
private final String requestId;

public RequestIdCallable(Callable<T> callable) {
this.callable = callable;
this.requestId = testRequestIdThreadLocal.get();
}

@Override
public T call() throws Exception {
String original = testRequestIdThreadLocal.get();
testRequestIdThreadLocal.set(requestId);
try {
return callable.call();
} finally {
testRequestIdThreadLocal.set(original);
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import rx.util.functions.Func1;

import com.netflix.config.ConfigurationManager;
import com.netflix.hystrix.Hystrix;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixRequestLog;
Expand Down Expand Up @@ -218,6 +219,7 @@ protected String run() throws Exception {
}

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public class HystrixContextCallable<K> implements Callable<K> {
private final Callable<K> actual;
private final HystrixRequestContext parentThreadState;

public HystrixContextCallable(Callable<K> actual) {
this.actual = actual;
public HystrixContextCallable(HystrixConcurrencyStrategy concurrencyStrategy, Callable<K> actual) {
this.actual = concurrencyStrategy.wrapCallable(actual);
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,28 @@
*/
package com.netflix.hystrix.strategy.concurrency;

import java.util.concurrent.Callable;

/**
* Wrapper around {@link Runnable} that manages the {@link HystrixRequestContext} initialization and cleanup for the execution of the {@link Runnable}
*
* @ExcludeFromJavadoc
*/
public class HystrixContextRunnable implements Runnable {

private final Runnable actual;
private final Callable<Void> actual;
private final HystrixRequestContext parentThreadState;

public HystrixContextRunnable(Runnable actual) {
this.actual = actual;
public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {

@Override
public Void call() throws Exception {
actual.run();
return null;
}

});
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
}

Expand All @@ -37,7 +47,11 @@ public void run() {
// set the state of this thread to that of its parent
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// execute actual Callable with the state of the parent
actual.run();
try {
actual.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
} finally {
// restore this thread back to its original state
HystrixRequestContext.setContextOnCurrentThread(existingState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
*/
package com.netflix.hystrix.strategy.concurrency;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.util.functions.Func2;

/**
Expand Down