diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java index 5384064f9..b4873a1e2 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java @@ -95,6 +95,7 @@ protected static enum TimedOutStatus { protected final AtomicBoolean commandStarted = new AtomicBoolean(); protected volatile boolean executionStarted = false; + protected volatile boolean threadExecutionStarted = false; protected volatile boolean isExecutionComplete = false; /* @@ -377,7 +378,7 @@ public Observable toObservable() { HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; - return handleRequestCacheHitAndEmitValues(fromCache); + return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } @@ -392,7 +393,7 @@ public Observable toObservable() { public void call() { if (commandCleanupExecuted.compareAndSet(false, true)) { isExecutionComplete = true; - handleCommandEnd(); + handleCommandEnd(_cmd); } } }; @@ -405,7 +406,7 @@ public void call() { eventNotifier.markEvent(HystrixEventType.CANCELLED, commandKey); executionResultAtTimeOfCancellation = executionResult .addEvent((int) (System.currentTimeMillis() - commandStartTimestamp), HystrixEventType.CANCELLED); - handleCommandEnd(); + handleCommandEnd(_cmd); } } }; @@ -417,9 +418,48 @@ public Observable call() { } }; + final Func1 wrapWithAllOnNextHooks = new Func1() { + @Override + public R call(R r) { + R afterFirstApplication = r; + + try { + afterFirstApplication = executionHook.onComplete(_cmd, r); + } catch (Throwable hookEx) { + logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx); + } + + try { + return executionHook.onEmit(_cmd, afterFirstApplication); + } catch (Throwable hookEx) { + logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx); + return afterFirstApplication; + } + } + }; + + final Action0 fireOnCompletedHook = new Action0() { + @Override + public void call() { + try { + executionHook.onSuccess(_cmd); + } catch (Throwable hookEx) { + logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx); + } + } + }; + + final Action1 fireOnErrorHook = new Action1() { + @Override + public void call(Throwable throwable) { + + } + }; + Observable hystrixObservable = - Observable.defer(applyHystrixSemantics). - lift(new CommandHookApplication(this)); + Observable.defer(applyHystrixSemantics) + .map(wrapWithAllOnNextHooks); + Observable afterCache; @@ -432,7 +472,7 @@ public Observable call() { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; - return handleRequestCacheHitAndEmitValues(fromCache); + return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); @@ -442,8 +482,10 @@ public Observable call() { } return afterCache - .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) - .doOnUnsubscribe(unsubscribeCommandCleanup); // perform cleanup once + .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) + .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once + .doOnCompleted(fireOnCompletedHook) + .doOnError(fireOnErrorHook); } private Observable applyHystrixSemantics(final AbstractCommand _cmd) { @@ -551,15 +593,6 @@ public void call(Notification rNotification) { } }; - final Action0 handleThreadEndOnNonTimeout = new Action0() { - @Override - public void call() { - if (!isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { - handleThreadEnd(); - } - } - }; - Observable execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) @@ -571,9 +604,7 @@ public void call() { return execution.doOnNext(markEmits) .doOnCompleted(markCompleted) .onErrorResumeNext(handleFallback) - .doOnEach(setRequestContext) - .doOnTerminate(handleThreadEndOnNonTimeout) - .lift(new DeprecatedOnCompleteWithValueHookApplication(_cmd)); + .doOnEach(setRequestContext); } private Observable executeCommandWithSpecifiedIsolation(final AbstractCommand _cmd) { @@ -592,6 +623,7 @@ public Observable call() { return Observable.error(new RuntimeException("timed out before executing run()")); } else { // not timed out so execute + threadExecutionStarted = true; HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); // store the command that is being run @@ -788,21 +820,38 @@ private Observable getUserExecutionObservable(final AbstractCommand _cmd) // so we catch it here and turn it into Observable.error userObservable = Observable.error(ex); } - return userObservable.lift(new ExecutionHookApplication(_cmd)) + + final AtomicBoolean threadStateCleanedUp = new AtomicBoolean(false); + + return userObservable + .lift(new ExecutionHookApplication(_cmd)) .lift(new DeprecatedOnRunHookApplication(_cmd)) .doOnTerminate(new Action0() { @Override public void call() { //If the command timed out, then the calling thread has already walked away so we need //to handle these markers. Otherwise, the calling thread will perform these for us. - if (isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { - handleThreadEnd(); + + if (threadExecutionStarted && isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { + if (threadStateCleanedUp.compareAndSet(false, true)) { + handleThreadEnd(_cmd); + } + } + } + }) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + if (threadExecutionStarted && isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { + if (threadStateCleanedUp.compareAndSet(false, true)) { + handleThreadEnd(_cmd); + } } } }); } - private Observable handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache fromCache) { + private Observable handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache fromCache, final AbstractCommand _cmd) { try { executionHook.onCacheHit(this); } catch (Throwable hookEx) { @@ -815,7 +864,7 @@ private Observable handleRequestCacheHitAndEmitValues(final HystrixCommandRes @Override public void call() { if (!cleanupCompleted.get()) { - cleanUpAfterResponseFromCache(); + cleanUpAfterResponseFromCache(_cmd); isExecutionComplete = true; cleanupCompleted.set(true); } @@ -824,14 +873,14 @@ public void call() { @Override public void call() { if (!cleanupCompleted.get()) { - cleanUpAfterResponseFromCache(); + cleanUpAfterResponseFromCache(_cmd); cleanupCompleted.set(true); } } }); } - private void cleanUpAfterResponseFromCache() { + private void cleanUpAfterResponseFromCache(AbstractCommand _cmd) { Reference tl = timeoutTimer.get(); if (tl != null) { tl.clear(); @@ -846,9 +895,14 @@ private void cleanUpAfterResponseFromCache() { .markUserThreadCompletion(latency); metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, executionStarted); eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey); + + //in case of timeout, the work chained onto the Hystrix thread has the responsibility of this cleanup + if (threadExecutionStarted && !isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { + handleThreadEnd(_cmd); + } } - private void handleCommandEnd() { + private void handleCommandEnd(AbstractCommand _cmd) { Reference tl = timeoutTimer.get(); if (tl != null) { tl.clear(); @@ -856,11 +910,19 @@ private void handleCommandEnd() { long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp; executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency); - ExecutionResult cancelled = executionResultAtTimeOfCancellation; - if (cancelled == null) { + if (executionResultAtTimeOfCancellation == null) { metrics.markCommandDone(executionResult, commandKey, threadPoolKey, executionStarted); } else { - metrics.markCommandDone(cancelled, commandKey, threadPoolKey, executionStarted); + metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, executionStarted); + } + + if (endCurrentThreadExecutingCommand != null) { + endCurrentThreadExecutingCommand.call(); + } + + //in case of timeout, the work chained onto the Hystrix thread has the responsibility of this cleanup + if (threadExecutionStarted && !isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { + handleThreadEnd(_cmd); } } @@ -871,8 +933,7 @@ private Observable handleSemaphoreRejectionViaFallback() { logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it // retrieve a fallback or throw an exception if no fallback available return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION, - "could not acquire a semaphore for execution", semaphoreRejectionException) - .lift(new DeprecatedOnCompleteWithValueHookApplication(this)); + "could not acquire a semaphore for execution", semaphoreRejectionException); } private Observable handleShortCircuitViaFallback() { @@ -883,8 +944,7 @@ private Observable handleShortCircuitViaFallback() { executionResult = executionResult.setExecutionException(shortCircuitException); try { return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, - "short-circuited", shortCircuitException) - .lift(new DeprecatedOnCompleteWithValueHookApplication(this)); + "short-circuited", shortCircuitException); } catch (Exception e) { return Observable.error(e); } @@ -998,18 +1058,13 @@ private boolean isRecoverableError(Throwable t) { return false; } - protected void handleThreadEnd() { - if (endCurrentThreadExecutingCommand != null) { - endCurrentThreadExecutingCommand.call(); - } - if (executionResult.isExecutedInThread()) { - HystrixCounters.decrementGlobalConcurrentThreads(); - threadPool.markThreadCompletion(); - try { - executionHook.onThreadComplete(this); - } catch (Throwable hookEx) { - logger.warn("Error calling HystrixCommandExecutionHook.onThreadComplete", hookEx); - } + protected void handleThreadEnd(AbstractCommand _cmd) { + HystrixCounters.decrementGlobalConcurrentThreads(); + threadPool.markThreadCompletion(); + try { + executionHook.onThreadComplete(_cmd); + } catch (Throwable hookEx) { + logger.warn("Error calling HystrixCommandExecutionHook.onThreadComplete", hookEx); } } @@ -1260,41 +1315,6 @@ public HystrixCommandProperties getProperties() { /* ******************************************************************************** */ /* ******************************************************************************** */ - private class CommandHookApplication implements Operator { - private final HystrixInvokable cmd; - - CommandHookApplication(HystrixInvokable cmd) { - this.cmd = cmd; - } - - @Override - public Subscriber call(final Subscriber subscriber) { - return new Subscriber(subscriber) { - @Override - public void onCompleted() { - try { - executionHook.onSuccess(cmd); - } catch (Throwable hookEx) { - logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx); - } - subscriber.onCompleted(); - } - - @Override - public void onError(Throwable e) { - //can't add the calls to executionHook.onError here, since this requires a FailureType param as well - subscriber.onError(e); - } - - @Override - public void onNext(R r) { - R wrappedValue = wrapWithOnEmitHook(r); - subscriber.onNext(wrappedValue); - } - }; - } - } - private class ExecutionHookApplication implements Operator { private final HystrixInvokable cmd; @@ -1365,41 +1385,6 @@ public void onNext(R r) { } } - @Deprecated //separated out to make it cleanly removable - private class DeprecatedOnCompleteWithValueHookApplication implements Operator { - private final HystrixInvokable cmd; - - DeprecatedOnCompleteWithValueHookApplication(HystrixInvokable cmd) { - this.cmd = cmd; - } - - @Override - public Subscriber call(final Subscriber subscriber) { - return new Subscriber(subscriber) { - @Override - public void onCompleted() { - subscriber.onCompleted(); - } - - @Override - public void onError(Throwable e) { - subscriber.onError(e); - } - - @Override - public void onNext(R r) { - try { - R wrappedValue = executionHook.onComplete(cmd, r); - subscriber.onNext(wrappedValue); - } catch (Throwable hookEx) { - logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx); - subscriber.onNext(r); - } - } - }; - } - } - @Deprecated //separated out to make it cleanly removable private class DeprecatedOnRunHookApplication implements Operator { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/executionhook/HystrixCommandExecutionHook.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/executionhook/HystrixCommandExecutionHook.java index eb542b774..ba809ba2d 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/executionhook/HystrixCommandExecutionHook.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/executionhook/HystrixCommandExecutionHook.java @@ -101,8 +101,8 @@ public void onThreadStart(HystrixInvokable commandInstance) { /** * Invoked at completion of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}. - * This will get invoked if the Hystrix thread successfully executes, regardless of whether the calling thread - * encountered a timeout. + * This will get invoked whenever the Hystrix thread is done executing, regardless of whether the thread finished + * naturally, or was unsubscribed externally * * @param commandInstance The executing HystrixCommand instance. * diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandMetricsTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandMetricsTest.java index 30598c122..b5102f920 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandMetricsTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandMetricsTest.java @@ -143,7 +143,7 @@ public void testCurrentConcurrentExecutionCount() { } try { - Thread.sleep(25); + Thread.sleep(150); } catch (InterruptedException ie) { fail(ie.getMessage()); } @@ -160,7 +160,7 @@ public Command(String commandKey, boolean shouldFail, boolean shouldFailWithBadR super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Command")) .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey)) .andCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter() - .withExecutionTimeoutInMilliseconds(100) + .withExecutionTimeoutInMilliseconds(1000) .withCircuitBreakerRequestVolumeThreshold(20))); this.shouldFail = shouldFail; this.shouldFailWithBadRequest = shouldFailWithBadRequest; diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java index 4ffbdb557..36104add3 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java @@ -817,7 +817,7 @@ public void testObservedExecutionTimeoutWithNoFallback() { * indefinitely by skipping the timeout protection of the execute() command. */ @Test - public void testObservedExecutionTimeoutWithFallback() { + public void testObservedExecutionTimeoutWithFallback() throws Exception { TestHystrixCommand command = getCommand(ExecutionIsolationStrategy.THREAD, AbstractTestHystrixCommand.ExecutionResult.SUCCESS, 200, AbstractTestHystrixCommand.FallbackResult.SUCCESS, 50); assertEquals(FlexibleTestHystrixCommand.FALLBACK_VALUE, command.observe().toBlocking().single()); @@ -3894,20 +3894,7 @@ public void call(TestHystrixCommand command) { assertTrue(hook.fallbackEventsMatch(0, 0, 0)); assertEquals(TimeoutException.class, hook.getCommandException().getClass()); assertNull(hook.getFallbackException()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onError - ", hook.executionSequence.toString()); - - try { - Thread.sleep(300); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - - //should be the same as above, since Hystrix thread is unsubscribed from by timeout - assertTrue(hook.commandEmissionsMatch(0, 1, 0)); - assertTrue(hook.executionEventsMatch(0, 0, 0)); - assertTrue(hook.fallbackEventsMatch(0, 0, 0)); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onError - ", hook.executionSequence.toString()); - + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onThreadComplete - onError - ", hook.executionSequence.toString()); } }); } @@ -3937,20 +3924,7 @@ public void call(TestHystrixCommand command) { assertTrue(hook.commandEmissionsMatch(1, 0, 1)); assertTrue(hook.executionEventsMatch(0, 0, 0)); assertTrue(hook.fallbackEventsMatch(1, 0, 1)); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onFallbackStart - onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - onFallbackSuccess - onSuccess - ", hook.executionSequence.toString()); - - try { - Thread.sleep(300); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - - //should be the same as above, since Hystrix thread is unsubscribed from by timeout - assertTrue(hook.commandEmissionsMatch(1, 0, 1)); - assertTrue(hook.executionEventsMatch(0, 0, 0)); - assertTrue(hook.fallbackEventsMatch(1, 0, 1)); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onFallbackStart - onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - onFallbackSuccess - onSuccess - ", hook.executionSequence.toString()); - + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onThreadComplete - onFallbackStart - onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - onFallbackSuccess - onSuccess - ", hook.executionSequence.toString()); } }); } @@ -3982,20 +3956,7 @@ public void call(TestHystrixCommand command) { assertTrue(hook.fallbackEventsMatch(0, 1, 0)); assertEquals(TimeoutException.class, hook.getCommandException().getClass()); assertEquals(RuntimeException.class, hook.getFallbackException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onFallbackStart - onFallbackError - onError - ", hook.executionSequence.toString()); - - try { - Thread.sleep(300); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - - //should be the same as above, since Hystrix thread is unsubscribed from by timeout - assertTrue(hook.commandEmissionsMatch(0, 1, 0)); - assertTrue(hook.executionEventsMatch(0, 0, 0)); - assertTrue(hook.fallbackEventsMatch(0, 1, 0)); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onFallbackStart - onFallbackError - onError - ", hook.executionSequence.toString()); - + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onThreadComplete - onFallbackStart - onFallbackError - onError - ", hook.executionSequence.toString()); } }); } @@ -4027,18 +3988,7 @@ public void call(TestHystrixCommand command) { assertTrue(hook.fallbackEventsMatch(0, 0, 0)); assertEquals(TimeoutException.class, hook.getCommandException().getClass()); assertNull(hook.getFallbackException()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onError - ", hook.executionSequence.toString()); - - try { - Thread.sleep(300); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - assertTrue(hook.commandEmissionsMatch(0, 1, 0)); - assertTrue(hook.executionEventsMatch(0, 1, 0)); - assertTrue(hook.fallbackEventsMatch(0, 0, 0)); - assertEquals(RuntimeException.class, hook.getExecutionException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onError - onExecutionError - !onRunError - onThreadComplete - ", hook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onThreadComplete - onError - ", hook.executionSequence.toString()); } }); @@ -4069,19 +4019,7 @@ public void call(TestHystrixCommand command) { assertTrue(hook.commandEmissionsMatch(1, 0, 1)); assertTrue(hook.executionEventsMatch(0, 0, 0)); assertTrue(hook.fallbackEventsMatch(1, 0, 1)); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onFallbackStart - onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - onFallbackSuccess - onSuccess - ", hook.executionSequence.toString()); - - try { - Thread.sleep(300); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - assertTrue(hook.commandEmissionsMatch(1, 0, 1)); - assertTrue(hook.executionEventsMatch(0, 1, 0)); - assertTrue(hook.fallbackEventsMatch(1, 0, 1)); - assertEquals(RuntimeException.class, hook.getExecutionException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onFallbackStart - onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - onFallbackSuccess - onSuccess - onExecutionError - !onRunError - onThreadComplete - ", hook.executionSequence.toString()); - + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onThreadComplete - onFallbackStart - onFallbackEmit - !onFallbackSuccess - !onComplete - onEmit - onFallbackSuccess - onSuccess - ", hook.executionSequence.toString()); } }); } @@ -4113,19 +4051,7 @@ public void call(TestHystrixCommand command) { assertTrue(hook.fallbackEventsMatch(0, 1, 0)); assertEquals(TimeoutException.class, hook.getCommandException().getClass()); assertEquals(RuntimeException.class, hook.getFallbackException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onFallbackStart - onFallbackError - onError - ", hook.executionSequence.toString()); - - try { - Thread.sleep(300); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - assertTrue(hook.commandEmissionsMatch(0, 1, 0)); - assertTrue(hook.executionEventsMatch(0, 1, 0)); - assertTrue(hook.fallbackEventsMatch(0, 1, 0)); - assertEquals(RuntimeException.class, hook.getExecutionException().getClass()); - assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onFallbackStart - onFallbackError - onError - onExecutionError - !onRunError - onThreadComplete - ", hook.executionSequence.toString()); - + assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onThreadComplete - onFallbackStart - onFallbackError - onError - ", hook.executionSequence.toString()); } }); }