Skip to content

Commit

Permalink
Merge pull request #1287 from akarnokd/SubjectObserverPerf
Browse files Browse the repository at this point in the history
ReplaySubject remove replayState CHM and related SubjectObserver changes
  • Loading branch information
benjchristensen committed May 30, 2014
2 parents 47ea6c8 + b195ff8 commit b0e87db
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 81 deletions.
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public final class AsyncSubject<T> extends Subject<T, T> {

/**
* Creates and returns a new {@code AsyncSubject}.
*
* @param <T> the result value type
* @return the new {@code AsyncSubject}
*/
public static <T> AsyncSubject<T> create() {
Expand All @@ -63,8 +63,8 @@ public static <T> AsyncSubject<T> create() {
@Override
public void call(SubjectObserver<T> o) {
Object v = state.get();
o.accept(v);
NotificationLite<T> nl = NotificationLite.instance();
NotificationLite<T> nl = state.nl;
o.accept(v, nl);
if (v == null || (!nl.isCompleted(v) && !nl.isError(v))) {
o.onCompleted();
}
Expand Down
8 changes: 4 additions & 4 deletions rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault)

@Override
public void call(SubjectObserver<T> o) {
o.emitFirst(state.get());
o.emitFirst(state.get(), state.nl);
}

};
Expand All @@ -121,7 +121,7 @@ public void onCompleted() {
if (last == null || state.active) {
Object n = nl.completed();
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.emitNext(n);
bo.emitNext(n, state.nl);
}
}
}
Expand All @@ -132,7 +132,7 @@ public void onError(Throwable e) {
if (last == null || state.active) {
Object n = nl.error(e);
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.emitNext(n);
bo.emitNext(n, state.nl);
}
}
}
Expand All @@ -143,7 +143,7 @@ public void onNext(T v) {
if (last == null || state.active) {
Object n = nl.next(v);
for (SubjectObserver<T> bo : state.next(n)) {
bo.emitNext(n);
bo.emitNext(n, state.nl);
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions rxjava-core/src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@ public final class PublishSubject<T> extends Subject<T, T> {
/**
* Creates and returns a new {@code PublishSubject}.
*
* @param <T> the value type
* @return the new {@code PublishSubject}
*/
public static <T> PublishSubject<T> create() {
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
state.onAdded = new Action1<SubjectObserver<T>>() {
state.onTerminated = new Action1<SubjectObserver<T>>() {

@Override
public void call(SubjectObserver<T> o) {
o.emitFirst(state.get());
o.emitFirst(state.get(), state.nl);
}

};
state.onTerminated = state.onAdded;
return new PublishSubject<T>(state, state);
}

Expand All @@ -79,7 +79,7 @@ public void onCompleted() {
if (state.active) {
Object n = nl.completed();
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.emitNext(n);
bo.emitNext(n, state.nl);
}
}

Expand All @@ -90,7 +90,7 @@ public void onError(final Throwable e) {
if (state.active) {
Object n = nl.error(e);
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.emitNext(n);
bo.emitNext(n, state.nl);
}
}
}
Expand Down
69 changes: 14 additions & 55 deletions rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
package rx.subjects;

import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import rx.Observer;
import rx.Scheduler;
Expand Down Expand Up @@ -98,26 +96,20 @@ public void call(SubjectObserver<T> o) {
int lastIndex = state.replayObserverFromIndex(0, o);

// now that it is caught up add to observers
state.replayState.put(o, lastIndex);
o.index(lastIndex);
}
};
ssm.onTerminated = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
Integer idx = state.replayState.remove(o);
Integer idx = o.index();
if (idx == null) {
idx = 0;
}
// we will finish replaying if there is anything left
state.replayObserverFromIndex(idx, o);
}
};
ssm.onUnsubscribed = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
state.replayState.remove(o);
}
};

return new ReplaySubject<T>(ssm, ssm, state);
}
Expand Down Expand Up @@ -273,20 +265,13 @@ static final <T> ReplaySubject<T> createWithState(final BoundedState<T> state,

@Override
public void call(SubjectObserver<T> t1) {
NodeList.Node<Object> l = state.removeState(t1);
NodeList.Node<Object> l = t1.index();
if (l == null) {
l = state.head();
}
state.replayObserverFromIndex(l, t1);
}

};
ssm.onUnsubscribed = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> t1) {
state.removeState(t1);
}

};

return new ReplaySubject<T>(ssm, ssm, state);
Expand Down Expand Up @@ -341,7 +326,7 @@ public void onCompleted() {
* @return Returns the number of subscribers.
*/
/* Support test. */int subscriberCount() {
return state.replayStateSize();
return ssm.state.observers.length;
}

private boolean caughtUp(SubjectObserver<? super T> o) {
Expand All @@ -364,8 +349,6 @@ private boolean caughtUp(SubjectObserver<? super T> o) {
* @param <T> the input and output type
*/
static final class UnboundedReplayState<T> implements ReplayState<T, Integer> {
/** Each Observer is tracked here for what events they have received. */
final ConcurrentHashMap<Observer<? super T>, Integer> replayState;
private final NotificationLite<T> nl = NotificationLite.instance();
/** The buffer. */
private final ArrayList<Object> list;
Expand All @@ -378,7 +361,6 @@ static final class UnboundedReplayState<T> implements ReplayState<T, Integer> {
= AtomicIntegerFieldUpdater.newUpdater(UnboundedReplayState.class, "index");
public UnboundedReplayState(int initialCapacity) {
list = new ArrayList<Object>(initialCapacity);
replayState = new ConcurrentHashMap<Observer<? super T>, Integer>();
}

@Override
Expand Down Expand Up @@ -417,10 +399,10 @@ public boolean terminated() {

@Override
public void replayObserver(SubjectObserver<? super T> observer) {
Integer lastEmittedLink = replayState.get(observer);
Integer lastEmittedLink = observer.index();
if (lastEmittedLink != null) {
int l = replayObserverFromIndex(lastEmittedLink, observer);
replayState.put(observer, l);
observer.index(l);
} else {
throw new IllegalStateException("failed to find lastEmittedLink for: " + observer);
}
Expand All @@ -441,12 +423,6 @@ public Integer replayObserverFromIndex(Integer idx, SubjectObserver<? super T> o
public Integer replayObserverFromIndexTest(Integer idx, SubjectObserver<? super T> observer, long now) {
return replayObserverFromIndex(idx, observer);
}

@Override
public int replayStateSize() {
return replayState.size();
}

}


Expand All @@ -456,7 +432,6 @@ public int replayStateSize() {
*/
static final class BoundedState<T> implements ReplayState<T, NodeList.Node<Object>> {
final NodeList<Object> list;
final ConcurrentHashMap<Observer<? super T>, NodeList.Node<Object>> replayState;
final EvictionPolicy evictionPolicy;
final Func1<Object, Object> enterTransform;
final Func1<Object, Object> leaveTransform;
Expand All @@ -468,7 +443,6 @@ public BoundedState(EvictionPolicy evictionPolicy, Func1<Object, Object> enterTr
Func1<Object, Object> leaveTransform) {
this.list = new NodeList<Object>();
this.tail = list.tail;
this.replayState = new ConcurrentHashMap<Observer<? super T>, NodeList.Node<Object>>();
this.evictionPolicy = evictionPolicy;
this.enterTransform = enterTransform;
this.leaveTransform = leaveTransform;
Expand Down Expand Up @@ -525,21 +499,11 @@ public Node<Object> head() {
public Node<Object> tail() {
return tail;
}
public Node<Object> removeState(SubjectObserver<? super T> o) {
return replayState.remove(o);
}
public void addState(SubjectObserver<? super T> o, Node<Object> state) {
if (state == null) {
throw new IllegalStateException("Null state!");
} else {
replayState.put(o, state);
}
}
@Override
public void replayObserver(SubjectObserver<? super T> observer) {
NodeList.Node<Object> lastEmittedLink = replayState.get(observer);
NodeList.Node<Object> lastEmittedLink = observer.index();
NodeList.Node<Object> l = replayObserverFromIndex(lastEmittedLink, observer);
addState(observer, l);
observer.index(l);
}

@Override
Expand All @@ -565,11 +529,6 @@ public NodeList.Node<Object> replayObserverFromIndexTest(
public boolean terminated() {
return terminated;
}

@Override
public int replayStateSize() {
return replayState.size();
}
}

// **************
Expand All @@ -584,6 +543,10 @@ public int replayStateSize() {
interface ReplayState<T, I> {
/** @return true if the subject has reached a terminal state. */
boolean terminated();
/**
* Replay contents to the given observer.
* @param observer the receiver of events
*/
void replayObserver(SubjectObserver<? super T> observer);
/**
* Replay the buffered values from an index position and return a new index
Expand All @@ -601,10 +564,6 @@ I replayObserverFromIndex(
*/
I replayObserverFromIndexTest(
I idx, SubjectObserver<? super T> observer, long now);
/**
* @return the size of the replay state map for testing purposes.
*/
int replayStateSize();
/**
* Add an OnNext value to the buffer
* @param value the value to add
Expand Down Expand Up @@ -756,7 +715,7 @@ public DefaultOnAdd(BoundedState<T> state) {
@Override
public void call(SubjectObserver<T> t1) {
NodeList.Node<Object> l = state.replayObserverFromIndex(state.head(), t1);
state.addState(t1, l);
t1.index(l);
}

}
Expand All @@ -783,7 +742,7 @@ public void call(SubjectObserver<T> t1) {
// accept all if terminated
l = state.replayObserverFromIndex(state.head(), t1);
}
state.addState(t1, l);
t1.index(l);
}

}
Expand Down
Loading

0 comments on commit b0e87db

Please sign in to comment.