Skip to content

Commit

Permalink
Merge pull request #1567 from benjchristensen/groupBy-selector
Browse files Browse the repository at this point in the history
groupBy with element selector
  • Loading branch information
benjchristensen committed Aug 12, 2014
2 parents 805ddb3 + 13bb25d commit 379602a
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 14 deletions.
40 changes: 39 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4931,6 +4931,44 @@ public final void forEach(final Action1<? super T> onNext, final Action1<Throwab
subscribe(onNext, onError, onComplete);
}

/**
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
* grouped items as {@link GroupedObservable}s, one {@code GroupedObservable} per group.
* <p>
* <img width="640" height="360" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupBy.png" alt="">
* <p>
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
* observable" and blocking any one group would block the entire parent stream. If you need
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
* or {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function that extracts the key for each item
* @param elementSelector
* a function that extracts the return element for each item
* @param <K>
* the key type
* @param <R>
* the element type
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a
* unique key value and each of which emits those items from the source Observable that share that
* key value
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupBy</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.groupby.aspx">MSDN: Observable.GroupBy</a>
*/
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
return lift(new OperatorGroupBy<T, K, R>(keySelector, elementSelector));
}

/**
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
* grouped items as {@link GroupedObservable}s, one {@code GroupedObservable} per group.
Expand Down Expand Up @@ -4962,7 +5000,7 @@ public final void forEach(final Action1<? super T> onNext, final Action1<Throwab
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.groupby.aspx">MSDN: Observable.GroupBy</a>
*/
public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
return lift(new OperatorGroupBy<K, T>(keySelector));
return lift(new OperatorGroupBy<T, K, T>(keySelector));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,35 @@
* @param <K> the key type
* @param <T> the source and group value type
*/
public final class OperatorGroupBy<K, T> implements Operator<GroupedObservable<K, T>, T> {
public final class OperatorGroupBy<T, K, R> implements Operator<GroupedObservable<K, R>, T> {

final Func1<? super T, ? extends K> keySelector;
final Func1<? super T, ? extends R> elementSelector;

@SuppressWarnings("unchecked")
public OperatorGroupBy(final Func1<? super T, ? extends K> keySelector) {
this(keySelector, (Func1<T, R>)IDENTITY);
}

public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends R> elementSelector) {
this.keySelector = keySelector;
this.elementSelector = elementSelector;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super GroupedObservable<K, T>> child) {
return new GroupBySubscriber<K, T>(keySelector, child);
public Subscriber<? super T> call(final Subscriber<? super GroupedObservable<K, R>> child) {
return new GroupBySubscriber<K, T, R>(keySelector, elementSelector, child);
}
static final class GroupBySubscriber<K, T> extends Subscriber<T> {
static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {
final Func1<? super T, ? extends K> keySelector;
final Subscriber<? super GroupedObservable<K, T>> child;
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Subscriber<? super GroupedObservable<K, T>> child) {
final Func1<? super T, ? extends R> elementSelector;
final Subscriber<? super GroupedObservable<K, R>> child;
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends R> elementSelector, Subscriber<? super GroupedObservable<K, R>> child) {
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
// and will unsubscribe on this parent if they are all unsubscribed
super();
this.keySelector = keySelector;
this.elementSelector = elementSelector;
this.child = child;
}
private final Map<K, BufferUntilSubscriber<T>> groups = new HashMap<K, BufferUntilSubscriber<T>>();
Expand Down Expand Up @@ -124,10 +133,10 @@ public void onNext(T t) {
group = BufferUntilSubscriber.create();
final BufferUntilSubscriber<T> _group = group;

GroupedObservable<K, T> go = new GroupedObservable<K, T>(key, new OnSubscribe<T>() {
GroupedObservable<K, R> go = new GroupedObservable<K, R>(key, new OnSubscribe<R>() {

@Override
public void call(final Subscriber<? super T> o) {
public void call(final Subscriber<? super R> o) {
// number of children we have running
COUNTER_UPDATER.incrementAndGet(GroupBySubscriber.this);
o.add(Subscriptions.create(new Action0() {
Expand All @@ -153,7 +162,7 @@ public void onError(Throwable e) {

@Override
public void onNext(T t) {
o.onNext(t);
o.onNext(elementSelector.call(t));
}

});
Expand Down Expand Up @@ -185,4 +194,13 @@ private void completeInner() {
}

}

private final static Func1<Object, Object> IDENTITY = new Func1<Object, Object>() {

@Override
public Object call(Object t) {
return t;
}

};
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ public Integer call(String s) {
return s.length();
}
};

@Test
public void testGroupBy() {
Observable<String> source = Observable.from("one", "two", "three", "four", "five", "six");
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<Integer, String>(length));
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));

Map<Integer, Collection<String>> map = toMap(grouped);

Expand All @@ -72,11 +72,37 @@ public void testGroupBy() {
assertArrayEquals(Arrays.asList("four", "five").toArray(), map.get(4).toArray());
assertArrayEquals(Arrays.asList("three").toArray(), map.get(5).toArray());
}

@Test
public void testGroupByWithElementSelector() {
Observable<String> source = Observable.from("one", "two", "three", "four", "five", "six");
Observable<GroupedObservable<Integer, Integer>> grouped = source.lift(new OperatorGroupBy<String, Integer, Integer>(length, length));

Map<Integer, Collection<Integer>> map = toMap(grouped);

assertEquals(3, map.size());
assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
}

@Test
public void testGroupByWithElementSelector2() {
Observable<String> source = Observable.from("one", "two", "three", "four", "five", "six");
Observable<GroupedObservable<Integer, Integer>> grouped = source.groupBy(length, length);

Map<Integer, Collection<Integer>> map = toMap(grouped);

assertEquals(3, map.size());
assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
}

@Test
public void testEmpty() {
Observable<String> source = Observable.empty();
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<Integer, String>(length));
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));

Map<Integer, Collection<String>> map = toMap(grouped);

Expand All @@ -89,7 +115,7 @@ public void testError() {
Observable<String> errorSource = Observable.error(new RuntimeException("forced failure"));
Observable<String> source = Observable.concat(sourceStrings, errorSource);

Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<Integer, String>(length));
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));

final AtomicInteger groupCounter = new AtomicInteger();
final AtomicInteger eventCounter = new AtomicInteger();
Expand Down

0 comments on commit 379602a

Please sign in to comment.