Skip to content

Commit

Permalink
Merge pull request #1293 from benjchristensen/perf-tests
Browse files Browse the repository at this point in the history
Fix and Update JMH Perf Tests
  • Loading branch information
benjchristensen committed May 30, 2014
2 parents 37bdda4 + be17891 commit 6a9ff72
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 44 deletions.
22 changes: 7 additions & 15 deletions rxjava-core/src/perf/java/rx/jmh/InputWithIncrementingInteger.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package rx.jmh;

import java.util.concurrent.CountDownLatch;

import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
Expand All @@ -27,6 +25,7 @@
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.observers.TestSubscriber;

/**
* Exposes an Observable and Observer that increments n Integers and consumes them in a Blackhole.
Expand All @@ -37,12 +36,11 @@ public class InputWithIncrementingInteger {
public int size;

public Observable<Integer> observable;
public Observer<Integer> observer;

private CountDownLatch latch;
private BlackHole bh;

@Setup
public void setup(final BlackHole bh) {
this.bh = bh;
observable = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> o) {
Expand All @@ -54,13 +52,12 @@ public void call(Subscriber<? super Integer> o) {
o.onCompleted();
}
});
}

latch = new CountDownLatch(1);

observer = new Observer<Integer>() {
public TestSubscriber<Integer> newSubscriber() {
return new TestSubscriber<Integer>(new Observer<Integer>() {
@Override
public void onCompleted() {
latch.countDown();
}

@Override
Expand All @@ -72,11 +69,6 @@ public void onError(Throwable e) {
public void onNext(Integer value) {
bh.consume(value);
}
};

}

public void awaitCompletion() throws InterruptedException {
latch.await();
});
}
}
6 changes: 4 additions & 2 deletions rxjava-core/src/perf/java/rx/operators/OperatorMapPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import rx.Observable.Operator;
import rx.functions.Func1;
import rx.jmh.InputWithIncrementingInteger;
import rx.observers.TestSubscriber;

public class OperatorMapPerf {

@GenerateMicroBenchmark
public void mapIdentityFunction(InputWithIncrementingInteger input) throws InterruptedException {
input.observable.lift(MAP_OPERATOR).subscribe(input.observer);
input.awaitCompletion();
TestSubscriber<Integer> ts = input.newSubscriber();
input.observable.lift(MAP_OPERATOR).subscribe(ts);
ts.awaitTerminalEvent();
}

private static final Func1<Integer, Integer> IDENTITY_FUNCTION = new Func1<Integer, Integer>() {
Expand Down
96 changes: 77 additions & 19 deletions rxjava-core/src/perf/java/rx/operators/OperatorSerializePerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx.operators;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
import org.openjdk.jmh.annotations.Param;
Expand All @@ -29,36 +29,97 @@
import rx.Observer;
import rx.Subscriber;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class OperatorSerializePerf {

public static void main(String[] args) {

}

@GenerateMicroBenchmark
public void noSerializationSingleThreaded(Input input) {
input.observable.subscribe(input.subscriber);
TestSubscriber<Long> ts = input.newSubscriber();
input.firehose.subscribe(ts);
ts.awaitTerminalEvent();
}

@GenerateMicroBenchmark
public void serializedSingleStream(Input input) {
input.observable.serialize().subscribe(input.subscriber);
TestSubscriber<Long> ts = input.newSubscriber();
input.firehose.serialize().subscribe(ts);
ts.awaitTerminalEvent();
}

@GenerateMicroBenchmark
public void serializedTwoStreamsSlightlyContended(final Input input) {
TestSubscriber<Long> ts = input.newSubscriber();
Observable.create(new OnSubscribe<Long>() {

@Override
public void call(Subscriber<? super Long> s) {
// break the contract here and concurrently onNext
input.interval.subscribeOn(Schedulers.computation()).unsafeSubscribe(s);
input.interval.subscribeOn(Schedulers.computation()).unsafeSubscribe(s);
// they will be serialized after
}

}).serialize().subscribe(ts);
ts.awaitTerminalEvent();
}

@State(Scope.Thread)
@GenerateMicroBenchmark
public void serializedTwoStreamsHighlyContended(final Input input) {
TestSubscriber<Long> ts = input.newSubscriber();
Observable.create(new OnSubscribe<Long>() {

@Override
public void call(Subscriber<? super Long> s) {
// break the contract here and concurrently onNext
input.firehose.subscribeOn(Schedulers.computation()).unsafeSubscribe(s);
input.firehose.subscribeOn(Schedulers.computation()).unsafeSubscribe(s);
// they will be serialized after
}

}).serialize().subscribe(ts);
ts.awaitTerminalEvent();
}

@GenerateMicroBenchmark
public void serializedTwoStreamsOneFastOneSlow(final Input input) {
TestSubscriber<Long> ts = input.newSubscriber();
Observable.create(new OnSubscribe<Long>() {

@Override
public void call(final Subscriber<? super Long> s) {
// break the contract here and concurrently onNext
input.interval.subscribeOn(Schedulers.computation()).unsafeSubscribe(s);
input.firehose.subscribeOn(Schedulers.computation()).unsafeSubscribe(s);
// they will be serialized after
}

}).serialize().subscribe(ts);
ts.awaitTerminalEvent();
}

@State(Scope.Benchmark)
public static class Input {

@Param({ "1024", "1048576" })
@Param({ "1", "1000" })
public int size;

public Observable<Integer> observable;
public TestSubscriber<Integer> subscriber;
public Observable<Long> firehose;
public Observable<Long> interval;

private CountDownLatch latch;
private BlackHole bh;

@Setup
public void setup(final BlackHole bh) {
observable = Observable.create(new OnSubscribe<Integer>() {
this.bh = bh;
firehose = Observable.create(new OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Integer> o) {
for (int value = 0; value < size; value++) {
public void call(Subscriber<? super Long> o) {
for (long value = 0; value < size; value++) {
if (o.isUnsubscribed())
return;
o.onNext(value);
Expand All @@ -67,12 +128,13 @@ public void call(Subscriber<? super Integer> o) {
}
});

latch = new CountDownLatch(1);
interval = Observable.timer(0, 1, TimeUnit.MILLISECONDS).take(size);
}

subscriber = new TestSubscriber<Integer>(new Observer<Integer>() {
public TestSubscriber<Long> newSubscriber() {
return new TestSubscriber<Long>(new Observer<Long>() {
@Override
public void onCompleted() {
latch.countDown();
}

@Override
Expand All @@ -81,15 +143,11 @@ public void onError(Throwable e) {
}

@Override
public void onNext(Integer value) {
public void onNext(Long value) {
bh.consume(value);
}
});

}

public void awaitCompletion() throws InterruptedException {
latch.await();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;

import rx.jmh.InputWithIncrementingInteger;
import rx.observers.TestSubscriber;

public class ComputationSchedulerPerf {

@GenerateMicroBenchmark
public void subscribeOn(InputWithIncrementingInteger input) throws InterruptedException {
input.observable.subscribeOn(Schedulers.computation()).subscribe(input.observer);
input.awaitCompletion();
TestSubscriber<Integer> ts = input.newSubscriber();
input.observable.subscribeOn(Schedulers.computation()).subscribe(ts);
ts.awaitTerminalEvent();
}

@GenerateMicroBenchmark
public void observeOn(InputWithIncrementingInteger input) throws InterruptedException {
input.observable.observeOn(Schedulers.computation()).subscribe(input.observer);
input.awaitCompletion();
TestSubscriber<Integer> ts = input.newSubscriber();
input.observable.observeOn(Schedulers.computation()).subscribe(ts);
ts.awaitTerminalEvent();
}
}
43 changes: 39 additions & 4 deletions rxjava-core/src/perf/java/rx/usecases/PerfTransforms.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import rx.Observable;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

public class PerfTransforms {

Expand All @@ -43,25 +44,59 @@ public Integer call(String i) {
}

@GenerateMicroBenchmark
public void flatMapTransformsUsingFrom(UseCaseInput input) throws InterruptedException {
public void flatMapTransforms(UseCaseInput input) throws InterruptedException {
input.observable.flatMap(new Func1<Integer, Observable<Integer>>() {

@Override
public Observable<Integer> call(Integer i) {
return Observable.from(i);
return Observable.just(i);
}

}).subscribe(input.observer);
input.awaitCompletion();
}

@GenerateMicroBenchmark
public void flatMapTransformsUsingJust(UseCaseInput input) throws InterruptedException {
public void flatMapNestedMapFilterTake(final UseCaseInput input) throws InterruptedException {
input.observable.flatMap(new Func1<Integer, Observable<Integer>>() {

@Override
public Observable<Integer> call(Integer i) {
return Observable.just(i);
return input.observable.map(new Func1<Integer, String>() {

@Override
public String call(Integer i) {
return String.valueOf(i);
}

}).map(new Func1<String, Integer>() {

@Override
public Integer call(String i) {
return Integer.parseInt(i);
}

}).filter(new Func1<Integer, Boolean>() {

@Override
public Boolean call(Integer t1) {
return true;
}

}).take(100);
}

}).subscribe(input.observer);
input.awaitCompletion();
}

@GenerateMicroBenchmark
public void flatMapAsyncNested(final UseCaseInput input) throws InterruptedException {
input.observable.flatMap(new Func1<Integer, Observable<Integer>>() {

@Override
public Observable<Integer> call(Integer i) {
return input.observable.subscribeOn(Schedulers.computation());
}

}).subscribe(input.observer);
Expand Down

0 comments on commit 6a9ff72

Please sign in to comment.