Skip to content

Commit

Permalink
Merge pull request #1652 from benjchristensen/1.x-scan-backpressure-fix
Browse files Browse the repository at this point in the history
Operator Scan Backpressure Fix
  • Loading branch information
benjchristensen committed Sep 1, 2014
2 parents 57dbf3c + 49c0032 commit d6b20cb
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 21 deletions.
27 changes: 18 additions & 9 deletions src/main/java/rx/internal/operators/OnSubscribeRedo.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,24 @@ public RedoFinite(long count) {

@Override
public Observable<?> call(Observable<? extends Notification<?>> ts) {
final Notification<Long> first = count < 0 ? Notification.<Long> createOnCompleted() : Notification.createOnNext(0l);
return ts.map(new Func1<Notification<?>, Notification<?>>() {

return ts.scan(first, new Func2<Notification<Long>, Notification<?>, Notification<Long>>() {
@SuppressWarnings("unchecked")
int num=0;

@Override
public Notification<Long> call(Notification<Long> n, Notification<?> term) {
final long value = n.getValue();
if (value < count)
return Notification.createOnNext(value + 1);
else
return (Notification<Long>) term;
public Notification<?> call(Notification<?> terminalNotification) {
if(count == 0) {
return terminalNotification;
}

num++;
if(num <= count) {
return Notification.createOnNext(num);
} else {
return terminalNotification;
}
}

}).dematerialize();
}
}
Expand Down Expand Up @@ -146,6 +152,9 @@ public static <T> Observable<T> repeat(Observable<T> source, final long count) {
}

public static <T> Observable<T> repeat(Observable<T> source, final long count, Scheduler scheduler) {
if(count == 0) {
return Observable.empty();
}
if (count < 0)
throw new IllegalArgumentException("count >= 0 expected");
return repeat(source, new RedoFinite(count - 1), scheduler);
Expand Down
63 changes: 51 additions & 12 deletions src/main/java/rx/internal/operators/OperatorScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicBoolean;

import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func2;
Expand Down Expand Up @@ -70,37 +73,73 @@ public OperatorScan(final Func2<R, ? super T, R> accumulator) {
}

@Override
public Subscriber<? super T> call(final Subscriber<? super R> observer) {
if (initialValue != NO_INITIAL_VALUE) {
observer.onNext(initialValue);
}
return new Subscriber<T>(observer) {
public Subscriber<? super T> call(final Subscriber<? super R> child) {
return new Subscriber<T>(child) {
private R value = initialValue;
boolean initialized = false;

@SuppressWarnings("unchecked")
@Override
public void onNext(T value) {
public void onNext(T currentValue) {
emitInitialValueIfNeeded(child);

if (this.value == NO_INITIAL_VALUE) {
// if there is NO_INITIAL_VALUE then we know it is type T for both so cast T to R
this.value = (R) value;
this.value = (R) currentValue;
} else {
try {
this.value = accumulator.call(this.value, value);
this.value = accumulator.call(this.value, currentValue);
} catch (Throwable e) {
observer.onError(OnErrorThrowable.addValueAsLastCause(e, value));
child.onError(OnErrorThrowable.addValueAsLastCause(e, currentValue));
}
}
observer.onNext(this.value);
child.onNext(this.value);
}

@Override
public void onError(Throwable e) {
observer.onError(e);
child.onError(e);
}

@Override
public void onCompleted() {
observer.onCompleted();
emitInitialValueIfNeeded(child);
child.onCompleted();
}

private void emitInitialValueIfNeeded(final Subscriber<? super R> child) {
if (!initialized) {
initialized = true;
// we emit first time through if we have an initial value
if (initialValue != NO_INITIAL_VALUE) {
child.onNext(initialValue);
}
}
}

/**
* We want to adjust the requested value by subtracting 1 if we have an initial value
*/
@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {

final AtomicBoolean once = new AtomicBoolean();

@Override
public void request(long n) {
if (once.compareAndSet(false, true)) {
if (initialValue == NO_INITIAL_VALUE) {
producer.request(n);
} else {
producer.request(n - 1);
}
} else {
// pass-thru after first time
producer.request(n);
}
}
});
}
};
}
Expand Down
150 changes: 150 additions & 0 deletions src/test/java/rx/internal/operators/OperatorScanTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.internal.operators;

import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
Expand All @@ -23,13 +24,21 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.internal.util.RxRingBuffer;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class OperatorScanTest {

Expand Down Expand Up @@ -116,4 +125,145 @@ public Integer call(Integer t1, Integer t2) {
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}

@Test
public void shouldNotEmitUntilAfterSubscription() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.range(1, 100).scan(0, new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
return t1 + t2;
}

}).filter(new Func1<Integer, Boolean>() {

@Override
public Boolean call(Integer t1) {
// this will cause request(1) when 0 is emitted
return t1 > 0;
}

}).subscribe(ts);

assertEquals(100, ts.getOnNextEvents().size());
}

@Test
public void testBackpressureWithInitialValue() {
final AtomicInteger count = new AtomicInteger();
Observable.range(1, 100)
.scan(0, new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
return t1 + t2;
}

})
.subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(10);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
fail(e.getMessage());
e.printStackTrace();
}

@Override
public void onNext(Integer t) {
count.incrementAndGet();
}

});

// we only expect to receive 10 since we request(10)
assertEquals(10, count.get());
}

@Test
public void testBackpressureWithoutInitialValue() {
final AtomicInteger count = new AtomicInteger();
Observable.range(1, 100)
.scan(new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
return t1 + t2;
}

})
.subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(10);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
fail(e.getMessage());
e.printStackTrace();
}

@Override
public void onNext(Integer t) {
count.incrementAndGet();
}

});

// we only expect to receive 10 since we request(10)
assertEquals(10, count.get());
}

@Test
public void testNoBackpressureWithInitialValue() {
final AtomicInteger count = new AtomicInteger();
Observable.range(1, 100)
.scan(0, new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
return t1 + t2;
}

})
.subscribe(new Subscriber<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
fail(e.getMessage());
e.printStackTrace();
}

@Override
public void onNext(Integer t) {
count.incrementAndGet();
}

});

// we only expect to receive 101 as we'll receive all 100 + the initial value
assertEquals(101, count.get());
}
}

0 comments on commit d6b20cb

Please sign in to comment.