Skip to content

Commit

Permalink
Merge pull request ReactiveX#514 from akarnokd/OperationJoin2
Browse files Browse the repository at this point in the history
Operation Join again
  • Loading branch information
benjchristensen committed Nov 26, 2013
2 parents b35337b + 8a80866 commit 5c6a1f7
Show file tree
Hide file tree
Showing 3 changed files with 601 additions and 0 deletions.
22 changes: 22 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import rx.operators.OperationFirstOrDefault;
import rx.operators.OperationGroupBy;
import rx.operators.OperationInterval;
import rx.operators.OperationJoin;
import rx.operators.OperationJoinPatterns;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
Expand Down Expand Up @@ -5942,5 +5943,26 @@ public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7, Plan0<R> p8, Plan0<R> p9) {
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9));
}
/**
* Correlates the elements of two sequences based on overlapping durations.
* @param right The right observable sequence to join elements for.
* @param leftDurationSelector A function to select the duration of each
* element of this observable sequence, used to
* determine overlap.
* @param rightDurationSelector A function to select the duration of each
* element of the right observable sequence,
* used to determine overlap.
* @param resultSelector A function invoked to compute a result element
* for any two overlapping elements of the left and
* right observable sequences.
* @return An observable sequence that contains result elements computed
* from source elements that have an overlapping duration.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229750.aspx'>MSDN: Observable.Join</a>
*/
public <TRight, TLeftDuration, TRightDuration, R> Observable<R> join(Observable<TRight> right, Func1<T, Observable<TLeftDuration>> leftDurationSelector,
Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
Func2<T, TRight, R> resultSelector) {
return create(new OperationJoin<T, TRight, TLeftDuration, TRightDuration, R>(this, right, leftDurationSelector, rightDurationSelector, resultSelector));
}
}

277 changes: 277 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationJoin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.HashMap;
import java.util.Map;
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Correlates the elements of two sequences based on overlapping durations.
*/
public class OperationJoin<TLeft, TRight, TLeftDuration, TRightDuration, R> implements OnSubscribeFunc<R> {
final Observable<TLeft> left;
final Observable<TRight> right;
final Func1<TLeft, Observable<TLeftDuration>> leftDurationSelector;
final Func1<TRight, Observable<TRightDuration>> rightDurationSelector;
final Func2<TLeft, TRight, R> resultSelector;
public OperationJoin(
Observable<TLeft> left,
Observable<TRight> right,
Func1<TLeft, Observable<TLeftDuration>> leftDurationSelector,
Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
Func2<TLeft, TRight, R> resultSelector) {
this.left = left;
this.right = right;
this.leftDurationSelector = leftDurationSelector;
this.rightDurationSelector = rightDurationSelector;
this.resultSelector = resultSelector;
}

@Override
public Subscription onSubscribe(Observer<? super R> t1) {
SerialSubscription cancel = new SerialSubscription();
ResultSink result = new ResultSink(t1, cancel);
cancel.setSubscription(result.run());
return cancel;
}
/** Manage the left and right sources. */
class ResultSink {
final Object gate = new Object();
final CompositeSubscription group = new CompositeSubscription();
boolean leftDone;
int leftId;
final Map<Integer, TLeft> leftMap = new HashMap<Integer, TLeft>();
boolean rightDone;
int rightId;
final Map<Integer, TRight> rightMap = new HashMap<Integer, TRight>();
final Observer<? super R> observer;
final Subscription cancel;
public ResultSink(Observer<? super R> observer, Subscription cancel) {
this.observer = observer;
this.cancel = cancel;
}
public Subscription run() {
SerialSubscription leftCancel = new SerialSubscription();
SerialSubscription rightCancel = new SerialSubscription();

group.add(leftCancel);
group.add(rightCancel);

leftCancel.setSubscription(left.subscribe(new LeftObserver(leftCancel)));
rightCancel.setSubscription(right.subscribe(new RightObserver(rightCancel)));

return group;
}
/** Observes the left values. */
class LeftObserver implements Observer<TLeft> {
final Subscription self;
public LeftObserver(Subscription self) {
this.self = self;
}
protected void expire(int id, Subscription resource) {
synchronized (gate) {
if (leftMap.remove(id) != null && leftMap.isEmpty() && leftDone) {
observer.onCompleted();
cancel.unsubscribe();
}
}
group.remove(resource);
}
@Override
public void onNext(TLeft args) {
int id;
synchronized (gate) {
id = leftId++;
leftMap.put(id, args);
}
SerialSubscription md = new SerialSubscription();
group.add(md);

Observable<TLeftDuration> duration;
try {
duration = leftDurationSelector.call(args);
} catch (Throwable t) {
observer.onError(t);
cancel.unsubscribe();
return;
}

md.setSubscription(duration.subscribe(new LeftDurationObserver(id, md)));

synchronized (gate) {
for (TRight r : rightMap.values()) {
R result;
try {
result = resultSelector.call(args, r);
} catch (Throwable t) {
observer.onError(t);
cancel.unsubscribe();
return;
}
observer.onNext(result);
}
}
}
@Override
public void onError(Throwable e) {
synchronized (gate) {
observer.onError(e);
cancel.unsubscribe();
}
}
@Override
public void onCompleted() {
synchronized (gate) {
leftDone = true;
if (rightDone || leftMap.isEmpty()) {
observer.onCompleted();
cancel.unsubscribe();
} else {
self.unsubscribe();
}
}
}
/** Observes the left duration. */
class LeftDurationObserver implements Observer<TLeftDuration> {
final int id;
final Subscription handle;
public LeftDurationObserver(int id, Subscription handle) {
this.id = id;
this.handle = handle;
}

@Override
public void onNext(TLeftDuration args) {
expire(id, handle);
}

@Override
public void onError(Throwable e) {
LeftObserver.this.onError(e);
}

@Override
public void onCompleted() {
expire(id, handle);
}

}
}
/** Observes the right values. */
class RightObserver implements Observer<TRight> {
final Subscription self;
public RightObserver(Subscription self) {
this.self = self;
}
void expire(int id, Subscription resource) {
synchronized (gate) {
if (rightMap.remove(id) != null && rightMap.isEmpty() && rightDone) {
observer.onCompleted();
cancel.unsubscribe();
}
}
group.remove(resource);
}
@Override
public void onNext(TRight args) {
int id = 0;
synchronized (gate) {
id = rightId++;
rightMap.put(id, args);
}
SerialSubscription md = new SerialSubscription();
group.add(md);

Observable<TRightDuration> duration;
try {
duration = rightDurationSelector.call(args);
} catch (Throwable t) {
observer.onError(t);
cancel.unsubscribe();
return;
}

md.setSubscription(duration.subscribe(new RightDurationObserver(id, md)));

synchronized (gate) {
for (TLeft lv : leftMap.values()) {
R result;
try {
result = resultSelector.call(lv, args);
} catch (Throwable t) {
observer.onError(t);
cancel.unsubscribe();
return;
}
observer.onNext(result);
}
}
}
@Override
public void onError(Throwable e) {
synchronized (gate) {
observer.onError(e);
cancel.unsubscribe();
}
}
@Override
public void onCompleted() {
synchronized (gate) {
rightDone = true;
if (leftDone || rightMap.isEmpty()) {
observer.onCompleted();
cancel.unsubscribe();
} else {
self.unsubscribe();
}
}
}
/** Observe the right duration. */
class RightDurationObserver implements Observer<TRightDuration> {
final int id;
final Subscription handle;
public RightDurationObserver(int id, Subscription handle) {
this.id = id;
this.handle = handle;
}

@Override
public void onNext(TRightDuration args) {
expire(id, handle);
}

@Override
public void onError(Throwable e) {
RightObserver.this.onError(e);
}

@Override
public void onCompleted() {
expire(id, handle);
}

}
}
}
}
Loading

0 comments on commit 5c6a1f7

Please sign in to comment.