Skip to content

Commit

Permalink
Merge pull request #1886 from akarnokd/MergeFix
Browse files Browse the repository at this point in the history
Buffer with time and merge fix
  • Loading branch information
benjchristensen committed Nov 17, 2014
2 parents d6cbf59 + b0aeb62 commit ed21c9e
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
12 changes: 7 additions & 5 deletions src/main/java/rx/internal/operators/OperatorBufferWithTime.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,19 @@ public OperatorBufferWithTime(long timespan, long timeshift, TimeUnit unit, int
@Override
public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
final Worker inner = scheduler.createWorker();
child.add(inner);
SerializedSubscriber<List<T>> serialized = new SerializedSubscriber<List<T>>(child);

if (timespan == timeshift) {
ExactSubscriber bsub = new ExactSubscriber(new SerializedSubscriber<List<T>>(child), inner);
ExactSubscriber bsub = new ExactSubscriber(serialized, inner);
bsub.add(inner);
child.add(bsub);
bsub.scheduleExact();
return bsub;
}

InexactSubscriber bsub = new InexactSubscriber(new SerializedSubscriber<List<T>>(child), inner);
InexactSubscriber bsub = new InexactSubscriber(serialized, inner);
bsub.add(inner);
child.add(bsub);
bsub.startNewChunk();
bsub.scheduleChunk();
return bsub;
Expand All @@ -94,7 +98,6 @@ final class InexactSubscriber extends Subscriber<T> {
/** Guarded by this. */
boolean done;
public InexactSubscriber(Subscriber<? super List<T>> child, Worker inner) {
super(child);
this.child = child;
this.inner = inner;
this.chunks = new LinkedList<List<T>>();
Expand Down Expand Up @@ -219,7 +222,6 @@ final class ExactSubscriber extends Subscriber<T> {
/** Guarded by this. */
boolean done;
public ExactSubscriber(Subscriber<? super List<T>> child, Worker inner) {
super(child);
this.child = child;
this.inner = inner;
this.chunk = new ArrayList<T>();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ public void request(long n) {
REQUESTED.getAndAdd(this, n);
if (ms.drainQueuesIfNeeded()) {
boolean sendComplete = false;
synchronized (this) {
synchronized (ms) {
if (ms.wip == 0 && ms.scalarValueQueue != null && ms.scalarValueQueue.isEmpty()) {
sendComplete = true;
}
Expand Down
34 changes: 34 additions & 0 deletions src/test/java/rx/internal/operators/OperatorBufferTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -981,4 +981,38 @@ public void onNext(List<Integer> t) {
});
assertEquals(Long.MAX_VALUE, requested.get());
}
@Test(timeout = 3000)
public void testBufferWithTimeDoesntUnsubscribeDownstream() throws InterruptedException {
@SuppressWarnings("unchecked")
final Observer<Object> o = mock(Observer.class);


final CountDownLatch cdl = new CountDownLatch(1);
Subscriber<Object> s = new Subscriber<Object>() {
@Override
public void onNext(Object t) {
o.onNext(t);
}
@Override
public void onError(Throwable e) {
o.onError(e);
cdl.countDown();
}
@Override
public void onCompleted() {
o.onCompleted();
cdl.countDown();
}
};

Observable.range(1, 1).delay(1, TimeUnit.SECONDS).buffer(2, TimeUnit.SECONDS).unsafeSubscribe(s);

cdl.await();

verify(o).onNext(Arrays.asList(1));
verify(o).onCompleted();
verify(o, never()).onError(any(Throwable.class));

assertFalse(s.isUnsubscribed());
}
}

0 comments on commit ed21c9e

Please sign in to comment.