Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More precise solution to #771 #780

Merged
merged 4 commits into from
Apr 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 27 additions & 40 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,8 @@ public void call(Subscriber<? super R> observer) {
metrics.incrementConcurrentExecutionCount();

// mark that we're starting execution on the ExecutionHook
try {
executionHook.onStart(_this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onStart", hookEx);
}
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_this);

/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
Expand Down Expand Up @@ -519,26 +516,21 @@ public void call(Subscriber<? super R> s) {
s.onError(new RuntimeException("timed out before executing run()"));
} else {
// not timed out so execute
try {
executionHook.onThreadStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onThreadStart", hookEx);
}
try {
executionHook.onRunStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onRunStart", hookEx);
}
try {
executionHook.onExecutionStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onExecutionStart", hookEx);
}
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
isExecutedInThread.set(true);
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
executionHook.onThreadStart(_self);
executionHook.onRunStart(_self);
executionHook.onExecutionStart(_self);
} catch (Throwable ex) {
s.onError(ex);
}
getExecutionObservableWithLifecycle().unsafeSubscribe(s); //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so no need to catch here
}
}
Expand All @@ -551,19 +543,16 @@ public Boolean call() {
}));
} else {
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
try {
executionHook.onRunStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onRunStart", hookEx);
}
try {
executionHook.onExecutionStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onExecutionStart", hookEx);
run = getExecutionObservableWithLifecycle(); //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
run = Observable.error(ex);
}
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
run = getExecutionObservableWithLifecycle(); //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so no need to catch here
}

run = run.doOnEach(new Action1<Notification<? super R>>() {
Expand Down Expand Up @@ -753,19 +742,17 @@ private Observable<R> getFallbackOrThrowException(final HystrixEventType eventTy

// acquire a permit
if (fallbackSemaphore.tryAcquire()) {
if (isFallbackUserSupplied(this)) {
try {
try {
if (isFallbackUserSupplied(this)) {
executionHook.onFallbackStart(this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onFallbackStart", hookEx);
fallbackExecutionChain = getFallbackObservable();
} else {
//same logic as above without the hook invocation
fallbackExecutionChain = getFallbackObservable();
}
}

try {
fallbackExecutionChain = getFallbackObservable();
} catch (Throwable t) {
// getFallback() is user provided and can throw so we catch it and turn it into Observable.error
fallbackExecutionChain = Observable.error(t);
} catch(Throwable ex) {
//If hook or user-fallback throws, then use that as the result of the fallback lookup
fallbackExecutionChain = Observable.error(ex);
}

fallbackExecutionChain = fallbackExecutionChain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantLock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -4206,6 +4207,68 @@ protected Integer getFallback() {
assertTrue(1 == new PrimaryCommand(new TestCircuitBreaker()).execute());
}

@Test
public void testOnRunStartHookThrows() {
final AtomicBoolean threadExceptionEncountered = new AtomicBoolean(false);
final AtomicBoolean semaphoreExceptionEncountered = new AtomicBoolean(false);
final AtomicBoolean onThreadStartInvoked = new AtomicBoolean(false);
final AtomicBoolean onThreadCompleteInvoked = new AtomicBoolean(false);

class FailureInjectionHook extends HystrixCommandExecutionHook {
@Override
public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
throw new HystrixRuntimeException(HystrixRuntimeException.FailureType.COMMAND_EXCEPTION, commandInstance.getClass(), "Injected Failure", null, null);
}

@Override
public <T> void onThreadStart(HystrixInvokable<T> commandInstance) {
onThreadStartInvoked.set(true);
super.onThreadStart(commandInstance);
}

@Override
public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) {
onThreadCompleteInvoked.set(true);
super.onThreadComplete(commandInstance);
}
}

final FailureInjectionHook failureInjectionHook = new FailureInjectionHook();

class FailureInjectedCommand extends TestHystrixCommand<Integer> {
public FailureInjectedCommand(ExecutionIsolationStrategy isolationStrategy) {
super(testPropsBuilder().setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter().withExecutionIsolationStrategy(isolationStrategy)), failureInjectionHook);
}

@Override
protected Integer run() throws Exception {
return 3;
}
}

TestHystrixCommand<Integer> threadCmd = new FailureInjectedCommand(ExecutionIsolationStrategy.THREAD);
try {
int result = threadCmd.execute();
System.out.println("RESULT : " + result);
} catch (Throwable ex) {
ex.printStackTrace();
threadExceptionEncountered.set(true);
}
assertTrue(threadExceptionEncountered.get());
assertTrue(onThreadStartInvoked.get());
assertTrue(onThreadCompleteInvoked.get());

TestHystrixCommand<Integer> semaphoreCmd = new FailureInjectedCommand(ExecutionIsolationStrategy.SEMAPHORE);
try {
int result = semaphoreCmd.execute();
System.out.println("RESULT : " + result);
} catch (Throwable ex) {
ex.printStackTrace();
semaphoreExceptionEncountered.set(true);
}
assertTrue(semaphoreExceptionEncountered.get());
}

/* ******************************************************************************** */
/* ******************************************************************************** */
/* private HystrixCommand class implementations for unit testing */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.netflix.hystrix;

import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;

abstract public class TestHystrixCommand<T> extends HystrixCommand<T> implements AbstractTestHystrixCommand<T> {

private final TestCommandBuilder builder;
Expand All @@ -26,6 +28,13 @@ public TestHystrixCommand(TestCommandBuilder builder) {
this.builder = builder;
}

public TestHystrixCommand(TestCommandBuilder builder, HystrixCommandExecutionHook executionHook) {
super(builder.owner, builder.dependencyKey, builder.threadPoolKey, builder.circuitBreaker, builder.threadPool,
builder.commandPropertiesDefaults, builder.threadPoolPropertiesDefaults, builder.metrics,
builder.fallbackSemaphore, builder.executionSemaphore, TEST_PROPERTIES_FACTORY, executionHook);
this.builder = builder;
}

public TestCommandBuilder getBuilder() {
return builder;
}
Expand Down