Skip to content

Commit

Permalink
Merge pull request #449 from mattrjacobs/move-thread-completion-to-en…
Browse files Browse the repository at this point in the history
…d-of-chain

Move thread completion to end of chain
  • Loading branch information
mattrjacobs committed Jan 7, 2015
2 parents 1a23f15 + b0808cb commit 3f6f95a
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 36 deletions.
44 changes: 20 additions & 24 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,12 @@

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
import rx.subscriptions.CompositeSubscription;

Expand All @@ -54,7 +51,6 @@
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault;
import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable;
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
Expand Down Expand Up @@ -367,7 +363,6 @@ protected ObservableCommand<R> toObservable(final boolean performAsyncTimeout) {
}

final HystrixInvokable<R> _this = this;
final AtomicReference<Action0> endCurrentThreadExecutingCommand = new AtomicReference<Action0>(); // don't like how this is being done

// create an Observable that will lazily execute when subscribed to
Observable<R> o = Observable.create(new OnSubscribe<R>() {
Expand All @@ -389,8 +384,7 @@ public void call(Subscriber<? super R> observer) {
/* used to track userThreadExecutionTime */
invocationStartTime = System.currentTimeMillis();

// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));


getRunObservableDecoratedForMetricsAndErrorHandling(performAsyncTimeout)
.doOnTerminate(new Action0() {
Expand Down Expand Up @@ -454,11 +448,6 @@ public void call() {
/* execution time (must occur before terminal state otherwise a race condition can occur if requested by client) */
recordTotalExecutionTime(invocationStartTime);
}

// pop the command that is being run
if (endCurrentThreadExecutingCommand.get() != null) {
endCurrentThreadExecutingCommand.get().call();
}
} finally {
metrics.decrementConcurrentExecutionCount();
// record that we're completed
Expand Down Expand Up @@ -496,11 +485,11 @@ private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling(final
metrics.incrementConcurrentExecutionCount();

final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final AtomicReference<Action0> endCurrentThreadExecutingCommand = new AtomicReference<Action0>(); // don't like how this is being done

Observable<R> run = null;
if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
isExecutedInThread.set(true);

run = Observable.create(new OnSubscribe<R>() {

Expand All @@ -516,17 +505,10 @@ public void call(Subscriber<? super R> s) {
// not timed out so execute
try {
threadPool.markThreadExecution();
final Action0 endCurrentThread = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
getExecutionObservable().doOnTerminate(new Action0() {

@Override
public void call() {
// TODO is this actually the end of the thread?
threadPool.markThreadCompletion();
executionHook.onThreadComplete(_self);
endCurrentThread.call();
}
}).unsafeSubscribe(s);
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
isExecutedInThread.set(true);
getExecutionObservable().unsafeSubscribe(s);
} catch (Throwable t) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
Expand All @@ -539,6 +521,8 @@ public void call() {
} else {
// semaphore isolated
executionHook.onRunStart(_self);
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
try {
run = getExecutionObservable();
} catch (Throwable t) {
Expand Down Expand Up @@ -657,6 +641,18 @@ public void call(Notification<? super R> n) {
setRequestContextIfNeeded(currentRequestContext);
}

}).doOnTerminate(new Action0() {
@Override
public void call() {
// pop the command that is being run
if (endCurrentThreadExecutingCommand.get() != null) {
endCurrentThreadExecutingCommand.get().call();
}
if (isExecutedInThread.get()) {
threadPool.markThreadCompletion();
executionHook.onThreadComplete(_self);
}
}
}).map(new Func1<R, R>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3583,7 +3583,7 @@ public void testExecutionHookRunFailureWithoutFallback() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());


/* test with queue() */
Expand Down Expand Up @@ -3623,7 +3623,7 @@ public void testExecutionHookRunFailureWithoutFallback() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());
}

/**
Expand Down Expand Up @@ -3661,7 +3661,7 @@ public void testExecutionHookRunFailureWithFallback() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());


/* test with queue() */
Expand Down Expand Up @@ -3698,7 +3698,7 @@ public void testExecutionHookRunFailureWithFallback() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());
}

/**
Expand Down Expand Up @@ -3743,7 +3743,7 @@ public void testExecutionHookRunFailureWithFallbackFailure() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());

/* test with queue() */
command = new KnownFailureTestCommandWithFallbackFailure();
Expand Down Expand Up @@ -3782,7 +3782,7 @@ public void testExecutionHookRunFailureWithFallbackFailure() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5745,8 +5745,8 @@ public void testRejectionWithFallbackRequestContextWithThreadIsolatedSynchronous
System.out.println("results.observeOnThread.get(): " + results.observeOnThread.get() + " " + Thread.currentThread());
assertTrue(results.observeOnThread.get().equals(Thread.currentThread())); // rejected so we stay on calling thread

// thread isolated so even though we're rejected we mark that it attempted execution in a thread
assertTrue(results.command.isExecutedInThread());
// thread isolated, but rejected, so this is false
assertFalse(results.command.isExecutedInThread());
}

/**
Expand All @@ -5766,8 +5766,8 @@ public void testRejectionWithFallbackRequestContextWithThreadIsolatedAsynchronou
assertTrue(results.isContextInitializedObserveOn.get()); // we capture and set the context once the user provided Observable emits
assertTrue(results.observeOnThread.get().getName().startsWith("RxNewThread"));

// thread isolated so even though we're rejected we mark that it attempted execution in a thread
assertTrue(results.command.isExecutedInThread());
// thread isolated, but rejected, so this is false
assertFalse(results.command.isExecutedInThread());
}

/**
Expand All @@ -5785,8 +5785,8 @@ public void testRejectionWithFallbackRequestContextWithThreadIsolatedAsynchronou
assertTrue(results.isContextInitializedObserveOn.get()); // the user scheduler captures context
assertTrue(results.observeOnThread.get().getName().startsWith("RxNewThread")); // the user provided thread/scheduler for getFallback

// thread isolated so even though we're rejected we mark that it attempted execution in a thread
assertTrue(results.command.isExecutedInThread());
// thread isolated, but rejected, so this is false
assertFalse(results.command.isExecutedInThread());
}

/* *************************************** testShortCircuitedWithFallbackRequestContext *********************************** */
Expand Down

0 comments on commit 3f6f95a

Please sign in to comment.