From 42fa5aab312a4bf38558d6ec07b0f4a7596e13f3 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 5 May 2014 13:13:20 -0700 Subject: [PATCH 1/2] Fix ReplaySubject termination race --- .../main/java/rx/subjects/ReplaySubject.java | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index d37f7820b1..bd5f16b571 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -25,6 +25,7 @@ import rx.Observer; import rx.functions.Action0; import rx.functions.Action1; +import rx.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; /** @@ -127,7 +128,7 @@ public void onCompleted() { @Override public void call() { - state.history.complete(Notification. createOnCompleted()); + state.history.complete(); } }); if (observers != null) { @@ -145,7 +146,7 @@ public void onError(final Throwable e) { @Override public void call() { - state.history.complete(Notification. createOnError(e)); + state.history.complete(e); } }); if (observers != null) { @@ -159,7 +160,7 @@ public void call() { @Override public void onNext(T v) { - if (state.history.terminalValue.get() != null) { + if (state.history.terminated) { return; } state.history.next(v); @@ -200,12 +201,9 @@ private void replayObserver(SubjectObserver observer) { private static int replayObserverFromIndex(History history, Integer l, SubjectObserver observer) { while (l < history.index.get()) { - observer.onNext(history.list.get(l)); + history.accept(observer, l); l++; } - if (history.terminalValue.get() != null) { - history.terminalValue.get().accept(observer); - } return l; } @@ -217,19 +215,19 @@ private static int replayObserverFromIndex(History history, Integer l, Su * @param */ private static class History { + private final NotificationLite nl = NotificationLite.instance(); private final AtomicInteger index; - private final ArrayList list; - private final AtomicReference> terminalValue; + private final ArrayList list; + private boolean terminated; public History(int initialCapacity) { index = new AtomicInteger(0); - list = new ArrayList(initialCapacity); - terminalValue = new AtomicReference>(); + list = new ArrayList(initialCapacity); } public boolean next(T n) { - if (terminalValue.get() == null) { - list.add(n); + if (!terminated) { + list.add(nl.next(n)); index.getAndIncrement(); return true; } else { @@ -237,8 +235,23 @@ public boolean next(T n) { } } - public void complete(Notification n) { - terminalValue.set(n); + public void accept(Observer o, int idx) { + nl.accept(o, list.get(idx)); + } + + public void complete() { + if (!terminated) { + terminated = true; + list.add(nl.completed()); + index.getAndIncrement(); + } + } + public void complete(Throwable e) { + if (!terminated) { + terminated = true; + list.add(nl.error(e)); + index.getAndIncrement(); + } } } From 4f55f958202102cb70ffa63bc7c9441b46fa039d Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 5 May 2014 13:40:12 -0700 Subject: [PATCH 2/2] ReplaySubject testRaceForTerminalState() --- .../ReplaySubjectConcurrencyTest.java | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java index ced51e9608..354701b041 100644 --- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java +++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java @@ -17,9 +17,7 @@ import static org.junit.Assert.assertEquals; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -32,6 +30,8 @@ import rx.Subscriber; import rx.Subscription; import rx.functions.Action1; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; public class ReplaySubjectConcurrencyTest { @@ -303,6 +303,21 @@ public void run() { } } + + /** + * https://github.com/Netflix/RxJava/issues/1147 + */ + @Test + public void testRaceForTerminalState() { + final List expected = Arrays.asList(1); + for (int i = 0; i < 100000; i++) { + TestSubscriber ts = new TestSubscriber(); + Observable.just(1).subscribeOn(Schedulers.computation()).cache().subscribe(ts); + ts.awaitTerminalEvent(); + ts.assertReceivedOnNext(expected); + ts.assertTerminalEvent(); + } + } private static class SubjectObserverThread extends Thread {