Skip to content

Commit

Permalink
Merge pull request #1150 from benjchristensen/replay-fix
Browse files Browse the repository at this point in the history
Fix ReplaySubject Terminal State Race Condition
  • Loading branch information
benjchristensen committed May 5, 2014
2 parents cdc0c2f + 4f55f95 commit f0404e9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 18 deletions.
43 changes: 28 additions & 15 deletions rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.operators.NotificationLite;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;

/**
Expand Down Expand Up @@ -127,7 +128,7 @@ public void onCompleted() {

@Override
public void call() {
state.history.complete(Notification.<T> createOnCompleted());
state.history.complete();
}
});
if (observers != null) {
Expand All @@ -145,7 +146,7 @@ public void onError(final Throwable e) {

@Override
public void call() {
state.history.complete(Notification.<T> createOnError(e));
state.history.complete(e);
}
});
if (observers != null) {
Expand All @@ -159,7 +160,7 @@ public void call() {

@Override
public void onNext(T v) {
if (state.history.terminalValue.get() != null) {
if (state.history.terminated) {
return;
}
state.history.next(v);
Expand Down Expand Up @@ -200,12 +201,9 @@ private void replayObserver(SubjectObserver<? super T> observer) {

private static <T> int replayObserverFromIndex(History<T> history, Integer l, SubjectObserver<? super T> observer) {
while (l < history.index.get()) {
observer.onNext(history.list.get(l));
history.accept(observer, l);
l++;
}
if (history.terminalValue.get() != null) {
history.terminalValue.get().accept(observer);
}

return l;
}
Expand All @@ -217,28 +215,43 @@ private static <T> int replayObserverFromIndex(History<T> history, Integer l, Su
* @param <T>
*/
private static class History<T> {
private final NotificationLite<T> nl = NotificationLite.instance();
private final AtomicInteger index;
private final ArrayList<T> list;
private final AtomicReference<Notification<T>> terminalValue;
private final ArrayList<Object> list;
private boolean terminated;

public History(int initialCapacity) {
index = new AtomicInteger(0);
list = new ArrayList<T>(initialCapacity);
terminalValue = new AtomicReference<Notification<T>>();
list = new ArrayList<Object>(initialCapacity);
}

public boolean next(T n) {
if (terminalValue.get() == null) {
list.add(n);
if (!terminated) {
list.add(nl.next(n));
index.getAndIncrement();
return true;
} else {
return false;
}
}

public void complete(Notification<T> n) {
terminalValue.set(n);
public void accept(Observer<? super T> o, int idx) {
nl.accept(o, list.get(idx));
}

public void complete() {
if (!terminated) {
terminated = true;
list.add(nl.completed());
index.getAndIncrement();
}
}
public void complete(Throwable e) {
if (!terminated) {
terminated = true;
list.add(nl.error(e));
index.getAndIncrement();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -32,6 +30,8 @@
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;

public class ReplaySubjectConcurrencyTest {
Expand Down Expand Up @@ -303,6 +303,21 @@ public void run() {
}

}

/**
* https://github.com/Netflix/RxJava/issues/1147
*/
@Test
public void testRaceForTerminalState() {
final List<Integer> expected = Arrays.asList(1);
for (int i = 0; i < 100000; i++) {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.just(1).subscribeOn(Schedulers.computation()).cache().subscribe(ts);
ts.awaitTerminalEvent();
ts.assertReceivedOnNext(expected);
ts.assertTerminalEvent();
}
}

private static class SubjectObserverThread extends Thread {

Expand Down

0 comments on commit f0404e9

Please sign in to comment.