Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

groupBy with element selector #1567

Merged
merged 1 commit into from
Aug 12, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4931,6 +4931,44 @@ public final void forEach(final Action1<? super T> onNext, final Action1<Throwab
subscribe(onNext, onError, onComplete);
}

/**
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
* grouped items as {@link GroupedObservable}s, one {@code GroupedObservable} per group.
* <p>
* <img width="640" height="360" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupBy.png" alt="">
* <p>
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
* observable" and blocking any one group would block the entire parent stream. If you need
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
* or {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function that extracts the key for each item
* @param elementSelector
* a function that extracts the return element for each item
* @param <K>
* the key type
* @param <R>
* the element type
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a
* unique key value and each of which emits those items from the source Observable that share that
* key value
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupBy</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.groupby.aspx">MSDN: Observable.GroupBy</a>
*/
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
return lift(new OperatorGroupBy<T, K, R>(keySelector, elementSelector));
}

/**
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
* grouped items as {@link GroupedObservable}s, one {@code GroupedObservable} per group.
Expand Down Expand Up @@ -4962,7 +5000,7 @@ public final void forEach(final Action1<? super T> onNext, final Action1<Throwab
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.groupby.aspx">MSDN: Observable.GroupBy</a>
*/
public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
return lift(new OperatorGroupBy<K, T>(keySelector));
return lift(new OperatorGroupBy<T, K, T>(keySelector));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,35 @@
* @param <K> the key type
* @param <T> the source and group value type
*/
public final class OperatorGroupBy<K, T> implements Operator<GroupedObservable<K, T>, T> {
public final class OperatorGroupBy<T, K, R> implements Operator<GroupedObservable<K, R>, T> {

final Func1<? super T, ? extends K> keySelector;
final Func1<? super T, ? extends R> elementSelector;

@SuppressWarnings("unchecked")
public OperatorGroupBy(final Func1<? super T, ? extends K> keySelector) {
this(keySelector, (Func1<T, R>)IDENTITY);
}

public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends R> elementSelector) {
this.keySelector = keySelector;
this.elementSelector = elementSelector;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super GroupedObservable<K, T>> child) {
return new GroupBySubscriber<K, T>(keySelector, child);
public Subscriber<? super T> call(final Subscriber<? super GroupedObservable<K, R>> child) {
return new GroupBySubscriber<K, T, R>(keySelector, elementSelector, child);
}
static final class GroupBySubscriber<K, T> extends Subscriber<T> {
static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {
final Func1<? super T, ? extends K> keySelector;
final Subscriber<? super GroupedObservable<K, T>> child;
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Subscriber<? super GroupedObservable<K, T>> child) {
final Func1<? super T, ? extends R> elementSelector;
final Subscriber<? super GroupedObservable<K, R>> child;
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends R> elementSelector, Subscriber<? super GroupedObservable<K, R>> child) {
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
// and will unsubscribe on this parent if they are all unsubscribed
super();
this.keySelector = keySelector;
this.elementSelector = elementSelector;
this.child = child;
}
private final Map<K, BufferUntilSubscriber<T>> groups = new HashMap<K, BufferUntilSubscriber<T>>();
Expand Down Expand Up @@ -124,10 +133,10 @@ public void onNext(T t) {
group = BufferUntilSubscriber.create();
final BufferUntilSubscriber<T> _group = group;

GroupedObservable<K, T> go = new GroupedObservable<K, T>(key, new OnSubscribe<T>() {
GroupedObservable<K, R> go = new GroupedObservable<K, R>(key, new OnSubscribe<R>() {

@Override
public void call(final Subscriber<? super T> o) {
public void call(final Subscriber<? super R> o) {
// number of children we have running
COUNTER_UPDATER.incrementAndGet(GroupBySubscriber.this);
o.add(Subscriptions.create(new Action0() {
Expand All @@ -153,7 +162,7 @@ public void onError(Throwable e) {

@Override
public void onNext(T t) {
o.onNext(t);
o.onNext(elementSelector.call(t));
}

});
Expand Down Expand Up @@ -185,4 +194,13 @@ private void completeInner() {
}

}

private final static Func1<Object, Object> IDENTITY = new Func1<Object, Object>() {

@Override
public Object call(Object t) {
return t;
}

};
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ public Integer call(String s) {
return s.length();
}
};

@Test
public void testGroupBy() {
Observable<String> source = Observable.from("one", "two", "three", "four", "five", "six");
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<Integer, String>(length));
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));

Map<Integer, Collection<String>> map = toMap(grouped);

Expand All @@ -72,11 +72,37 @@ public void testGroupBy() {
assertArrayEquals(Arrays.asList("four", "five").toArray(), map.get(4).toArray());
assertArrayEquals(Arrays.asList("three").toArray(), map.get(5).toArray());
}

@Test
public void testGroupByWithElementSelector() {
Observable<String> source = Observable.from("one", "two", "three", "four", "five", "six");
Observable<GroupedObservable<Integer, Integer>> grouped = source.lift(new OperatorGroupBy<String, Integer, Integer>(length, length));

Map<Integer, Collection<Integer>> map = toMap(grouped);

assertEquals(3, map.size());
assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
}

@Test
public void testGroupByWithElementSelector2() {
Observable<String> source = Observable.from("one", "two", "three", "four", "five", "six");
Observable<GroupedObservable<Integer, Integer>> grouped = source.groupBy(length, length);

Map<Integer, Collection<Integer>> map = toMap(grouped);

assertEquals(3, map.size());
assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
}

@Test
public void testEmpty() {
Observable<String> source = Observable.empty();
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<Integer, String>(length));
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));

Map<Integer, Collection<String>> map = toMap(grouped);

Expand All @@ -89,7 +115,7 @@ public void testError() {
Observable<String> errorSource = Observable.error(new RuntimeException("forced failure"));
Observable<String> source = Observable.concat(sourceStrings, errorSource);

Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<Integer, String>(length));
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));

final AtomicInteger groupCounter = new AtomicInteger();
final AtomicInteger eventCounter = new AtomicInteger();
Expand Down