Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ObserveOn throughput enhancements #2804

Merged
merged 1 commit into from
Mar 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 57 additions & 45 deletions src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.Queue;
import java.util.concurrent.atomic.*;

import rx.Observable.Operator;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.*;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.internal.util.RxRingBuffer;
import rx.schedulers.ImmediateScheduler;
import rx.schedulers.TrampolineScheduler;
import rx.internal.util.*;
import rx.internal.util.unsafe.*;
import rx.schedulers.*;

/**
* Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer.
Expand Down Expand Up @@ -64,16 +61,15 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
/** Observe through individual queue per observer. */
private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
final Subscriber<? super T> child;
private final Scheduler.Worker recursiveScheduler;
private final ScheduledUnsubscribe scheduledUnsubscribe;
final Scheduler.Worker recursiveScheduler;
final ScheduledUnsubscribe scheduledUnsubscribe;
final NotificationLite<T> on = NotificationLite.instance();

private final RxRingBuffer queue = RxRingBuffer.getSpscInstance();
private boolean completed = false;
private boolean failure = false;
final Queue<Object> queue;
volatile boolean completed = false;
volatile boolean failure = false;

@SuppressWarnings("unused")
private volatile long requested = 0;
volatile long requested = 0;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<ObserveOnSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "requested");

Expand All @@ -82,12 +78,19 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<ObserveOnSubscriber> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter");

volatile Throwable error;

// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler, queue);
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
} else {
queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
}
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
child.add(scheduledUnsubscribe);
child.setProducer(new Producer() {

Expand All @@ -113,10 +116,8 @@ public void onNext(final T t) {
if (isUnsubscribed() || completed) {
return;
}
try {
queue.onNext(t);
} catch (MissingBackpressureException e) {
onError(e);
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
Expand All @@ -127,8 +128,10 @@ public void onCompleted() {
if (isUnsubscribed() || completed) {
return;
}
if (error != null) {
return;
}
completed = true;
queue.onCompleted();
schedule();
}

Expand All @@ -137,53 +140,64 @@ public void onError(final Throwable e) {
if (isUnsubscribed() || completed) {
return;
}
if (error != null) {
return;
}
error = e;
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
unsubscribe();
completed = true;
// mark failure so the polling thread will skip onNext still in the queue
completed = true;
failure = true;
queue.onError(e);
schedule();
}

protected void schedule() {
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
recursiveScheduler.schedule(new Action0() {
final Action0 action = new Action0() {

@Override
public void call() {
pollQueue();
}
@Override
public void call() {
pollQueue();
}

});
};

protected void schedule() {
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
recursiveScheduler.schedule(action);
}
}

// only execute this from schedule()
private void pollQueue() {
void pollQueue() {
int emitted = 0;
do {
/*
* Set to 1 otherwise it could have grown very large while in the last poll loop
* and then we can end up looping all those times again here before exiting even once we've drained
*/
COUNTER_UPDATER.set(this, 1);
counter = 1;

// middle:
while (!scheduledUnsubscribe.isUnsubscribed()) {
if (failure) {
// special handling to short-circuit an error propagation
Object o = queue.poll();
// completed so we will skip onNext if they exist and only emit terminal events
if (on.isError(o)) {
// only emit error
on.accept(child, o);
// we have emitted a terminal event so return (exit the loop we're in)
child.onError(error);
return;
} else {
if (requested == 0 && completed && queue.isEmpty()) {
child.onCompleted();
return;
}
} else {
if (REQUESTED.getAndDecrement(this) != 0) {
Object o = queue.poll();
if (o == null) {
if (completed) {
if (failure) {
child.onError(error);
} else {
child.onCompleted();
}
return;
}
// nothing in queue
REQUESTED.incrementAndGet(this);
break;
Expand Down Expand Up @@ -213,12 +227,10 @@ static final class ScheduledUnsubscribe implements Subscription {
final Scheduler.Worker worker;
volatile int once;
static final AtomicIntegerFieldUpdater<ScheduledUnsubscribe> ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ScheduledUnsubscribe.class, "once");
final RxRingBuffer queue;
volatile boolean unsubscribed = false;

public ScheduledUnsubscribe(Scheduler.Worker worker, RxRingBuffer queue) {
public ScheduledUnsubscribe(Scheduler.Worker worker) {
this.worker = worker;
this.queue = queue;
}

@Override
Expand Down
38 changes: 21 additions & 17 deletions src/main/java/rx/internal/schedulers/EventLoopsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
*/
package rx.internal.schedulers;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.util.RxThreadFactory;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import java.util.concurrent.*;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import rx.*;
import rx.functions.Action0;
import rx.internal.util.*;
import rx.subscriptions.*;

public class EventLoopsScheduler extends Scheduler {
/** Manages a fixed number of workers. */
Expand Down Expand Up @@ -95,7 +92,9 @@ public Subscription scheduleDirect(Action0 action) {
}

private static class EventLoopWorker extends Scheduler.Worker {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final SubscriptionList serial = new SubscriptionList();
private final CompositeSubscription timed = new CompositeSubscription();
private final SubscriptionList both = new SubscriptionList(serial, timed);
private final PoolWorker poolWorker;

EventLoopWorker(PoolWorker poolWorker) {
Expand All @@ -105,28 +104,33 @@ private static class EventLoopWorker extends Scheduler.Worker {

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
both.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
return both.isUnsubscribed();
}

@Override
public Subscription schedule(Action0 action) {
return schedule(action, 0, null);
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction s = poolWorker.scheduleActual(action, 0, null);

serial.add(s);
s.addParent(serial);

return s;
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit, timed);

ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
}
Expand Down
35 changes: 33 additions & 2 deletions src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import rx.*;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.internal.util.RxThreadFactory;
import rx.internal.util.*;
import rx.plugins.*;
import rx.subscriptions.Subscriptions;
import rx.subscriptions.*;

/**
* @warn class description missing
Expand Down Expand Up @@ -174,6 +174,37 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time

return run;
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);

Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);

return run;
}

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);

Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);

return run;
}

@Override
public void unsubscribe() {
Expand Down
Loading