From 8748f51e51500298f06d8223629b1116f93b89bb Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Tue, 6 Jan 2015 16:09:15 -0800 Subject: [PATCH 1/2] Only call the Hystrix.startCurrentThreadExecutingCommand/endCurrentThreadExecutingCommand sequence once per command invocation --- .../com/netflix/hystrix/AbstractCommand.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java index a64afdd8d..46002f538 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java @@ -367,7 +367,6 @@ protected ObservableCommand toObservable(final boolean performAsyncTimeout) { } final HystrixInvokable _this = this; - final AtomicReference endCurrentThreadExecutingCommand = new AtomicReference(); // don't like how this is being done // create an Observable that will lazily execute when subscribed to Observable o = Observable.create(new OnSubscribe() { @@ -389,8 +388,7 @@ public void call(Subscriber observer) { /* used to track userThreadExecutionTime */ invocationStartTime = System.currentTimeMillis(); - // store the command that is being run - endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey())); + getRunObservableDecoratedForMetricsAndErrorHandling(performAsyncTimeout) .doOnTerminate(new Action0() { @@ -454,11 +452,6 @@ public void call() { /* execution time (must occur before terminal state otherwise a race condition can occur if requested by client) */ recordTotalExecutionTime(invocationStartTime); } - - // pop the command that is being run - if (endCurrentThreadExecutingCommand.get() != null) { - endCurrentThreadExecutingCommand.get().call(); - } } finally { metrics.decrementConcurrentExecutionCount(); // record that we're completed @@ -496,6 +489,7 @@ private Observable getRunObservableDecoratedForMetricsAndErrorHandling(final metrics.incrementConcurrentExecutionCount(); final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); + final AtomicReference endCurrentThreadExecutingCommand = new AtomicReference(); // don't like how this is being done Observable run = null; if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) { @@ -516,7 +510,8 @@ public void call(Subscriber s) { // not timed out so execute try { threadPool.markThreadExecution(); - final Action0 endCurrentThread = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); + // store the command that is being run + endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey())); getExecutionObservable().doOnTerminate(new Action0() { @Override @@ -524,7 +519,6 @@ public void call() { // TODO is this actually the end of the thread? threadPool.markThreadCompletion(); executionHook.onThreadComplete(_self); - endCurrentThread.call(); } }).unsafeSubscribe(s); } catch (Throwable t) { @@ -539,6 +533,8 @@ public void call() { } else { // semaphore isolated executionHook.onRunStart(_self); + // store the command that is being run + endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey())); try { run = getExecutionObservable(); } catch (Throwable t) { @@ -657,6 +653,14 @@ public void call(Notification n) { setRequestContextIfNeeded(currentRequestContext); } + }).doOnTerminate(new Action0() { + @Override + public void call() { + // pop the command that is being run + if (endCurrentThreadExecutingCommand.get() != null) { + endCurrentThreadExecutingCommand.get().call(); + } + } }).map(new Func1() { @Override From b0808cb5723fbe3ba44276b7264e30dac9d04cc1 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Tue, 6 Jan 2015 16:45:42 -0800 Subject: [PATCH 2/2] Move the marking of the thread pool completion to the end of the Observable chain (#377) * Both thread pool metrics and the onThreadComplete execution hook now run later * Modified behavior of HystrixCommand.isExecutedInThread() to match the Javadoc (#448) ** Now this returns true iff the Hystrix thread executed the run() method --- .../com/netflix/hystrix/AbstractCommand.java | 20 ++++++------------- .../netflix/hystrix/HystrixCommandTest.java | 12 +++++------ .../hystrix/HystrixObservableCommandTest.java | 12 +++++------ 3 files changed, 18 insertions(+), 26 deletions(-) diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java index 46002f538..798eecf2d 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java @@ -33,15 +33,12 @@ import rx.Notification; import rx.Observable; -import rx.Observer; import rx.Observable.OnSubscribe; import rx.Observable.Operator; -import rx.Scheduler; import rx.Subscriber; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; -import rx.schedulers.Schedulers; import rx.subjects.ReplaySubject; import rx.subscriptions.CompositeSubscription; @@ -54,7 +51,6 @@ import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault; import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable; -import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler; import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier; import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook; @@ -494,7 +490,6 @@ private Observable getRunObservableDecoratedForMetricsAndErrorHandling(final Observable run = null; if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) - isExecutedInThread.set(true); run = Observable.create(new OnSubscribe() { @@ -512,15 +507,8 @@ public void call(Subscriber s) { threadPool.markThreadExecution(); // store the command that is being run endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey())); - getExecutionObservable().doOnTerminate(new Action0() { - - @Override - public void call() { - // TODO is this actually the end of the thread? - threadPool.markThreadCompletion(); - executionHook.onThreadComplete(_self); - } - }).unsafeSubscribe(s); + isExecutedInThread.set(true); + getExecutionObservable().unsafeSubscribe(s); } catch (Throwable t) { // the run() method is a user provided implementation so can throw instead of using Observable.onError // so we catch it here and turn it into Observable.error @@ -660,6 +648,10 @@ public void call() { if (endCurrentThreadExecutingCommand.get() != null) { endCurrentThreadExecutingCommand.get().call(); } + if (isExecutedInThread.get()) { + threadPool.markThreadCompletion(); + executionHook.onThreadComplete(_self); + } } }).map(new Func1() { diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java index 5f2e1736b..d7efe0e45 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java @@ -3583,7 +3583,7 @@ public void testExecutionHookRunFailureWithoutFallback() { assertEquals(1, command.builder.executionHook.threadComplete.get()); // expected hook execution sequence - assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); /* test with queue() */ @@ -3623,7 +3623,7 @@ public void testExecutionHookRunFailureWithoutFallback() { assertEquals(1, command.builder.executionHook.threadComplete.get()); // expected hook execution sequence - assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); } /** @@ -3661,7 +3661,7 @@ public void testExecutionHookRunFailureWithFallback() { assertEquals(1, command.builder.executionHook.threadComplete.get()); // expected hook execution sequence - assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); /* test with queue() */ @@ -3698,7 +3698,7 @@ public void testExecutionHookRunFailureWithFallback() { assertEquals(1, command.builder.executionHook.threadComplete.get()); // expected hook execution sequence - assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); } /** @@ -3743,7 +3743,7 @@ public void testExecutionHookRunFailureWithFallbackFailure() { assertEquals(1, command.builder.executionHook.threadComplete.get()); // expected hook execution sequence - assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); /* test with queue() */ command = new KnownFailureTestCommandWithFallbackFailure(); @@ -3782,7 +3782,7 @@ public void testExecutionHookRunFailureWithFallbackFailure() { assertEquals(1, command.builder.executionHook.threadComplete.get()); // expected hook execution sequence - assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString()); + assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); } /** diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java index e4a4be2ae..684c01a3f 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java @@ -5745,8 +5745,8 @@ public void testRejectionWithFallbackRequestContextWithThreadIsolatedSynchronous System.out.println("results.observeOnThread.get(): " + results.observeOnThread.get() + " " + Thread.currentThread()); assertTrue(results.observeOnThread.get().equals(Thread.currentThread())); // rejected so we stay on calling thread - // thread isolated so even though we're rejected we mark that it attempted execution in a thread - assertTrue(results.command.isExecutedInThread()); + // thread isolated, but rejected, so this is false + assertFalse(results.command.isExecutedInThread()); } /** @@ -5766,8 +5766,8 @@ public void testRejectionWithFallbackRequestContextWithThreadIsolatedAsynchronou assertTrue(results.isContextInitializedObserveOn.get()); // we capture and set the context once the user provided Observable emits assertTrue(results.observeOnThread.get().getName().startsWith("RxNewThread")); - // thread isolated so even though we're rejected we mark that it attempted execution in a thread - assertTrue(results.command.isExecutedInThread()); + // thread isolated, but rejected, so this is false + assertFalse(results.command.isExecutedInThread()); } /** @@ -5785,8 +5785,8 @@ public void testRejectionWithFallbackRequestContextWithThreadIsolatedAsynchronou assertTrue(results.isContextInitializedObserveOn.get()); // the user scheduler captures context assertTrue(results.observeOnThread.get().getName().startsWith("RxNewThread")); // the user provided thread/scheduler for getFallback - // thread isolated so even though we're rejected we mark that it attempted execution in a thread - assertTrue(results.command.isExecutedInThread()); + // thread isolated, but rejected, so this is false + assertFalse(results.command.isExecutedInThread()); } /* *************************************** testShortCircuitedWithFallbackRequestContext *********************************** */