Skip to content

Commit

Permalink
Reduce Subscription Object Allocation
Browse files Browse the repository at this point in the history
- significant reduction in object allocations
- details on research available at ReactiveX#1204
  • Loading branch information
benjchristensen committed May 29, 2014
1 parent 2e0e537 commit 6fe35b8
Show file tree
Hide file tree
Showing 8 changed files with 470 additions and 126 deletions.
13 changes: 10 additions & 3 deletions rxjava-core/src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx;

import rx.subscriptions.SubscriptionList;
import rx.subscriptions.CompositeSubscription;

/**
Expand All @@ -30,17 +31,23 @@
*/
public abstract class Subscriber<T> implements Observer<T>, Subscription {

private final CompositeSubscription cs;
private final SubscriptionList cs;

protected Subscriber(CompositeSubscription cs) {
protected Subscriber(SubscriptionList cs) {
if (cs == null) {
throw new IllegalArgumentException("The CompositeSubscription can not be null");
}
this.cs = cs;
}

@Deprecated
protected Subscriber(CompositeSubscription cs) {
this(new SubscriptionList());
add(cs);
}

protected Subscriber() {
this(new CompositeSubscription());
this(new SubscriptionList());
}

protected Subscriber(Subscriber<?> op) {
Expand Down
5 changes: 2 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import rx.functions.Action0;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -46,9 +45,9 @@ public OperatorGroupBy(final Func1<? super T, ? extends K> keySelector) {

@Override
public Subscriber<? super T> call(final Subscriber<? super GroupedObservable<K, T>> childObserver) {
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
// a new SubscriptionList to decouple the subscription as the inner subscriptions need a separate lifecycle
// and will unsubscribe on this parent if they are all unsubscribed
return new Subscriber<T>(new CompositeSubscription()) {
return new Subscriber<T>() {
private final Map<K, BufferUntilSubscriber<T>> groups = new HashMap<K, BufferUntilSubscriber<T>>();
private final AtomicInteger completionCounter = new AtomicInteger(0);
private final AtomicBoolean completionEmitted = new AtomicBoolean(false);
Expand Down
12 changes: 6 additions & 6 deletions rxjava-core/src/main/java/rx/operators/OperatorPivot.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import rx.Subscriber;
import rx.functions.Action0;
import rx.observables.GroupedObservable;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SubscriptionList;
import rx.subscriptions.Subscriptions;

public class OperatorPivot<K1, K2, T> implements Operator<GroupedObservable<K2, GroupedObservable<K1, T>>, GroupedObservable<K1, GroupedObservable<K2, T>>> {

@Override
public Subscriber<? super GroupedObservable<K1, GroupedObservable<K2, T>>> call(final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
final AtomicReference<State> state = new AtomicReference<State>(State.create());
final OperatorPivot<K1, K2, T>.PivotSubscriber pivotSubscriber = new PivotSubscriber(new CompositeSubscription(), child, state);
final OperatorPivot<K1, K2, T>.PivotSubscriber pivotSubscriber = new PivotSubscriber(new SubscriptionList(), child, state);
child.add(Subscriptions.create(new Action0() {

@Override
Expand Down Expand Up @@ -65,12 +65,12 @@ private final class PivotSubscriber extends Subscriber<GroupedObservable<K1, Gro
* needs to decouple the subscription as the inner subscriptions need a separate lifecycle
* and will unsubscribe on this parent if they are all unsubscribed
*/
private final CompositeSubscription parentSubscription;
private final SubscriptionList parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
private final AtomicReference<State> state;
private final GroupState<K1, K2, T> groups;

private PivotSubscriber(CompositeSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
private PivotSubscriber(SubscriptionList parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
super(parentSubscription);
this.parentSubscription = parentSubscription;
this.child = child;
Expand Down Expand Up @@ -159,10 +159,10 @@ private static class GroupState<K1, K2, T> {
private final ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>> innerSubjects = new ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>>();
private final ConcurrentHashMap<K2, Outer<K1, K2, T>> outerSubjects = new ConcurrentHashMap<K2, Outer<K1, K2, T>>();
private final AtomicBoolean completeEmitted = new AtomicBoolean();
private final CompositeSubscription parentSubscription;
private final SubscriptionList parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;

public GroupState(CompositeSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
public GroupState(SubscriptionList parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
this.parentSubscription = parentSubscription;
this.child = child;
}
Expand Down
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperatorTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import rx.Observable.Operator;
import rx.Subscriber;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SubscriptionList;

/**
* Returns an Observable that emits the first <code>num</code> items emitted by the source
Expand All @@ -40,7 +40,7 @@ public OperatorTake(int limit) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final CompositeSubscription parent = new CompositeSubscription();
final SubscriptionList parent = new SubscriptionList();
if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SubscriptionList;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -36,7 +36,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final CompositeSubscription parentSubscription = new CompositeSubscription();
final SubscriptionList parentSubscription = new SubscriptionList();
subscriber.add(Subscriptions.create(new Action0() {

@Override
Expand Down
163 changes: 53 additions & 110 deletions rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
package rx.subscriptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Set;

import rx.Subscription;
import rx.exceptions.CompositeException;
Expand All @@ -30,154 +33,94 @@
*/
public final class CompositeSubscription implements Subscription {

private final AtomicReference<State> state = new AtomicReference<State>();

/** Empty initial state. */
private static final State CLEAR_STATE;
/** Unsubscribed empty state. */
private static final State CLEAR_STATE_UNSUBSCRIBED;
static {
Subscription[] s0 = new Subscription[0];
CLEAR_STATE = new State(false, s0);
CLEAR_STATE_UNSUBSCRIBED = new State(true, s0);
}

private static final class State {
final boolean isUnsubscribed;
final Subscription[] subscriptions;

State(boolean u, Subscription[] s) {
this.isUnsubscribed = u;
this.subscriptions = s;
}

State unsubscribe() {
return CLEAR_STATE_UNSUBSCRIBED;
}

State add(Subscription s) {
int idx = subscriptions.length;
Subscription[] newSubscriptions = new Subscription[idx + 1];
System.arraycopy(subscriptions, 0, newSubscriptions, 0, idx);
newSubscriptions[idx] = s;
return new State(isUnsubscribed, newSubscriptions);
}

State remove(Subscription s) {
if ((subscriptions.length == 1 && subscriptions[0].equals(s)) || subscriptions.length == 0) {
return clear();
}
Subscription[] newSubscriptions = new Subscription[subscriptions.length - 1];
int idx = 0;
for (Subscription _s : subscriptions) {
if (!_s.equals(s)) {
// was not in this composite
if (idx == newSubscriptions.length) {
return this;
}
newSubscriptions[idx] = _s;
idx++;
}
}
if (idx == 0) {
return clear();
}
// subscription appeared more than once
if (idx < newSubscriptions.length) {
Subscription[] newSub2 = new Subscription[idx];
System.arraycopy(newSubscriptions, 0, newSub2, 0, idx);
return new State(isUnsubscribed, newSub2);
}
return new State(isUnsubscribed, newSubscriptions);
}

State clear() {
return isUnsubscribed ? CLEAR_STATE_UNSUBSCRIBED : CLEAR_STATE;
}
}
private Set<Subscription> subscriptions;
private boolean unsubscribed = false;

public CompositeSubscription() {
state.set(CLEAR_STATE);
}

public CompositeSubscription(final Subscription... subscriptions) {
state.set(new State(false, subscriptions));
this.subscriptions = new HashSet<Subscription>(Arrays.asList(subscriptions));
}

@Override
public boolean isUnsubscribed() {
return state.get().isUnsubscribed;
public synchronized boolean isUnsubscribed() {
return unsubscribed;
}

public void add(final Subscription s) {
State oldState;
State newState;
do {
oldState = state.get();
if (oldState.isUnsubscribed) {
s.unsubscribe();
return;
Subscription unsubscribe = null;
synchronized (this) {
if (unsubscribed) {
unsubscribe = s;
} else {
newState = oldState.add(s);
if (subscriptions == null) {
subscriptions = new HashSet<Subscription>(4);
}
subscriptions.add(s);
}
} while (!state.compareAndSet(oldState, newState));
}
if (unsubscribe != null) {
// call after leaving the synchronized block so we're not holding a lock while executing this
unsubscribe.unsubscribe();
}
}

public void remove(final Subscription s) {
State oldState;
State newState;
do {
oldState = state.get();
if (oldState.isUnsubscribed) {
boolean unsubscribe = false;
synchronized (this) {
if (unsubscribed || subscriptions == null) {
return;
} else {
newState = oldState.remove(s);
}
} while (!state.compareAndSet(oldState, newState));
// if we removed successfully we then need to call unsubscribe on it
s.unsubscribe();
unsubscribe = subscriptions.remove(s);
}
if (unsubscribe) {
// if we removed successfully we then need to call unsubscribe on it (outside of the lock)
s.unsubscribe();
}
}

public void clear() {
State oldState;
State newState;
do {
oldState = state.get();
if (oldState.isUnsubscribed) {
Collection<Subscription> unsubscribe = null;
synchronized (this) {
if (unsubscribed || subscriptions == null) {
return;
} else {
newState = oldState.clear();
unsubscribe = subscriptions;
subscriptions = null;
}
} while (!state.compareAndSet(oldState, newState));
// if we cleared successfully we then need to call unsubscribe on all previous
unsubscribeFromAll(oldState.subscriptions);
}
unsubscribeFromAll(unsubscribe);
}

@Override
public void unsubscribe() {
State oldState;
State newState;
do {
oldState = state.get();
if (oldState.isUnsubscribed) {
synchronized (this) {
if (unsubscribed) {
return;
} else {
newState = oldState.unsubscribe();
}
} while (!state.compareAndSet(oldState, newState));
unsubscribeFromAll(oldState.subscriptions);
unsubscribed = true;
}
// we will only get here once
unsubscribeFromAll(subscriptions);
}

private static void unsubscribeFromAll(Subscription[] subscriptions) {
final List<Throwable> es = new ArrayList<Throwable>();
private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
if (subscriptions == null) {
return;
}
List<Throwable> es = null;
for (Subscription s : subscriptions) {
try {
s.unsubscribe();
} catch (Throwable e) {
if (es == null) {
es = new ArrayList<Throwable>();
}
es.add(e);
}
}
if (!es.isEmpty()) {
if (es != null) {
if (es.size() == 1) {
Throwable t = es.get(0);
if (t instanceof RuntimeException) {
Expand Down
Loading

0 comments on commit 6fe35b8

Please sign in to comment.