Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement the 'single' operator #967

Merged
merged 8 commits into from
Apr 20, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
(testing "returns one element"
(is (= 1 (b/single (rx/return 1)))))
(testing "throw if empty"
(is (thrown? java.lang.IllegalArgumentException (b/single (rx/empty)))))
(is (thrown? java.util.NoSuchElementException (b/single (rx/empty)))))
(testing "throw if many"
(is (thrown? java.lang.IllegalArgumentException (b/single (rx/seq->o [1 2])))))
(testing "rethrows errors"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,18 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(10, List(-1, 0, 1).toObservable.filter(condition).firstOrElse(10).toBlockingObservable.single)
}

@Test def firstLastSingleExample() {
assertEquals(1, List(1, 2, 3, 4).toObservable.head.toBlockingObservable.single)
assertEquals(1, List(1, 2, 3, 4).toObservable.first.toBlockingObservable.single)
assertEquals(4, List(1, 2, 3, 4).toObservable.last.toBlockingObservable.single)
assertEquals(1, List(1).toObservable.single.toBlockingObservable.single)

assertEquals(1, List(1, 2, 3, 4).toObservable.toBlockingObservable.head)
assertEquals(1, List(1, 2, 3, 4).toObservable.toBlockingObservable.first)
assertEquals(4, List(1, 2, 3, 4).toObservable.toBlockingObservable.last)
assertEquals(1, List(1).toObservable.toBlockingObservable.single)
}

def square(x: Int): Int = {
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}")
Thread.sleep(100) // calculating a square is heavy work :)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1949,36 +1949,64 @@ trait Observable[+T]
def headOrElse[U >: T](default: => U): Observable[U] = firstOrElse(default)

/**
* Returns an Observable that emits only the very first item emitted by the source Observable.
* This is just a shorthand for `take(1)`.
*
* Returns an Observable that emits only the very first item emitted by the source Observable, or raises an
* `NoSuchElementException` if the source Observable is empty.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
*
* @return an Observable that emits only the very first item from the source, or none if the
* source Observable completes without emitting a single item.
*
* @return an Observable that emits only the very first item emitted by the source Observable, or raises an
* `NoSuchElementException` if the source Observable is empty
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-first">RxJava Wiki: first()</a>
* @see "MSDN: Observable.firstAsync()"
*/
def first: Observable[T] = take(1)
def first: Observable[T] = {
toScalaObservable[T](asJavaObservable.first)
}

/*

TODO once https://github.com/Netflix/RxJava/issues/417 is fixed, we can add head and tail methods
/**
* Returns an Observable that emits only the very first item emitted by the source Observable, or raises an
* `NoSuchElementException` if the source Observable is empty.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
*
* @return an Observable that emits only the very first item emitted by the source Observable, or raises an
* `NoSuchElementException` if the source Observable is empty
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-first">RxJava Wiki: first()</a>
* @see "MSDN: Observable.firstAsync()"
* @see [[Observable.first]]
*/
def head: Observable[T] = first

/**
* emits NoSuchElementException("head of empty Observable") if empty
* Returns an Observable that emits the last item emitted by the source Observable or notifies observers of
* an `NoSuchElementException` if the source Observable is empty.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/last.png">
*
* @return an Observable that emits the last item from the source Observable or notifies observers of an
* error
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observable-Operators#wiki-last">RxJava Wiki: last()</a>
* @see "MSDN: Observable.lastAsync()"
*/
def head: Observable[T] = {
this.take(1).fold[Option[T]](None)((v: Option[T], e: T) => Some(e)).map({
case Some(element) => element
case None => throw new NoSuchElementException("head of empty Observable")
})
def last: Observable[T] = {
toScalaObservable[T](asJavaObservable.last)
}

/**
* emits an UnsupportedOperationException("tail of empty list") if empty
* If the source Observable completes after emitting a single item, return an Observable that emits that
* item. If the source Observable emits more than one item or no items, throw an `NoSuchElementException`.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.png">
*
* @return an Observable that emits the single item emitted by the source Observable
* @throws NoSuchElementException
* if the source emits more than one item or no items
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-single-and-singleordefault">RxJava Wiki: single()</a>
* @see "MSDN: Observable.singleAsync()"
*/
def tail: Observable[T] = ???

*/
def single: Observable[T] = {
toScalaObservable[T](asJavaObservable.single)
}

/**
* Returns an Observable that forwards all sequentially distinct items emitted from the source Observable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,49 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
new WithFilter[T](p, asJava)
}

/**
* Returns the last item emitted by a specified [[Observable]], or
* throws `NoSuchElementException` if it emits no items.
*
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.last.png">
*
* @return the last item emitted by the source [[Observable]]
* @throws NoSuchElementException
* if source contains no elements
* @see <a href="https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#last-and-lastordefault">RxJava Wiki: last()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.last.aspx">MSDN: Observable.Last</a>
*/
def last : T = {
asJava.last : T
}

/**
* Returns the first item emitted by a specified [[Observable]], or
* `NoSuchElementException` if source contains no elements.
*
* @return the first item emitted by the source [[Observable]]
* @throws NoSuchElementException
* if source contains no elements
* @see <a href="https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava Wiki: first()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
*/
def first : T = {
asJava.first : T
}

/**
* Returns the first item emitted by a specified [[Observable]], or
* `NoSuchElementException` if source contains no elements.
*
* @return the first item emitted by the source [[Observable]]
* @throws NoSuchElementException
* if source contains no elements
* @see <a href="https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#first-and-firstordefault">RxJava Wiki: first()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
* @see [[BlockingObservable.first]]
*/
def head : T = first

// last -> use toIterable.last
// lastOrDefault -> use toIterable.lastOption
// first -> use toIterable.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;

import org.junit.Test;
import org.mockito.InOrder;
Expand Down Expand Up @@ -56,7 +57,7 @@ public void testMinWithEmpty() {
observable.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onError(
isA(IllegalArgumentException.class));
isA(NoSuchElementException.class));
inOrder.verifyNoMoreInteractions();
}

Expand Down Expand Up @@ -96,7 +97,7 @@ public int compare(Integer o1, Integer o2) {
observable.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onError(
isA(IllegalArgumentException.class));
isA(NoSuchElementException.class));
inOrder.verifyNoMoreInteractions();
}

Expand Down Expand Up @@ -216,7 +217,7 @@ public void testMaxWithEmpty() {
observable.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onError(
isA(IllegalArgumentException.class));
isA(NoSuchElementException.class));
inOrder.verifyNoMoreInteractions();
}

Expand Down Expand Up @@ -256,7 +257,7 @@ public int compare(Integer o1, Integer o2) {
observable.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onError(
isA(IllegalArgumentException.class));
isA(NoSuchElementException.class));
inOrder.verifyNoMoreInteractions();
}

Expand Down
33 changes: 18 additions & 15 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
import rx.operators.OperationReplay;
import rx.operators.OperationSample;
import rx.operators.OperationSequenceEqual;
import rx.operators.OperationSingle;
import rx.operators.OperatorSingle;
import rx.operators.OperationSkip;
import rx.operators.OperationSkipLast;
import rx.operators.OperationSkipUntil;
Expand Down Expand Up @@ -4545,12 +4545,12 @@ public final Observable<T> finallyDo(Action0 action) {

/**
* Returns an Observable that emits only the very first item emitted by the source Observable, or raises an
* {@code IllegalArgumentException} if the source Observable is empty.
* {@code NoSuchElementException} if the source Observable is empty.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that it can still throw IllegalArgumentException if there are too many elements.

new IllegalArgumentException("Sequence contains too many elements")

So don't all of these Javadocs now need to include both IllegalArgumentException and NoSuchElementException?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An Observable which has more than one element is valid for first and last. These two operator just fetch the first or the last one from the Observable. Only single requires that the Observable must contain exactly one element.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, you do have both exceptions on some of the other methods where it applies. Nevermind!

* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
*
* @return an Observable that emits only the very first item emitted by the source Observable, or raises an
* {@code IllegalArgumentException} if the source Observable is empty
* {@code NoSuchElementException} if the source Observable is empty
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-first">RxJava Wiki: first()</a>
* @see "MSDN: Observable.firstAsync()"
*/
Expand All @@ -4560,14 +4560,14 @@ public final Observable<T> first() {

/**
* Returns an Observable that emits only the very first item emitted by the source Observable that satisfies
* a specified condition, or raises an {@code IllegalArgumentException} if no such items are emitted.
* a specified condition, or raises an {@code NoSuchElementException} if no such items are emitted.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstN.png">
*
* @param predicate
* the condition that an item emitted by the source Observable has to satisfy
* @return an Observable that emits only the very first item emitted by the source Observable that satisfies
* the {@code predicate}, or raises an {@code IllegalArgumentException} if no such items are emitted
* the {@code predicate}, or raises an {@code NoSuchElementException} if no such items are emitted
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-first">RxJava Wiki: first()</a>
* @see "MSDN: Observable.firstAsync()"
*/
Expand Down Expand Up @@ -4778,7 +4778,7 @@ public final <TRight, TLeftDuration, TRightDuration, R> Observable<R> join(Obser

/**
* Returns an Observable that emits the last item emitted by the source Observable or notifies observers of
* an {@code IllegalArgumentException} if the source Observable is empty.
* an {@code NoSuchElementException} if the source Observable is empty.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/last.png">
*
Expand All @@ -4793,14 +4793,14 @@ public final Observable<T> last() {

/**
* Returns an Observable that emits only the last item emitted by the source Observable that satisfies a
* given condition, or an {@code IllegalArgumentException} if no such items are emitted.
* given condition, or an {@code NoSuchElementException} if no such items are emitted.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/last.p.png">
*
* @param predicate
* the condition any source emitted item has to satisfy
* @return an Observable that emits only the last item satisfying the given condition from the source, or an
* {@code IllegalArgumentException} if no such items are emitted
* {@code NoSuchElementException} if no such items are emitted
* @throws IllegalArgumentException
* if no items that match the predicate are emitted by the source Observable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observable-Operators#wiki-last">RxJava Wiki: last()</a>
Expand Down Expand Up @@ -6205,24 +6205,26 @@ public final Observable<T> serialize() {
/**
* If the source Observable completes after emitting a single item, return an Observable that emits that
* item. If the source Observable emits more than one item or no items, throw an
* {@code IllegalArgumentException}.
* {@code NoSuchElementException}.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.png">
*
* @return an Observable that emits the single item emitted by the source Observable
* @throws IllegalArgumentException
* if the source emits more than one item or no items
* if the source emits more than one item
* @throws NoSuchElementException
* if the source emits no items
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-single-and-singleordefault">RxJava Wiki: single()</a>
* @see "MSDN: Observable.singleAsync()"
*/
public final Observable<T> single() {
return create(OperationSingle.<T> single(this));
return lift(new OperatorSingle<T>());
}

/**
* If the Observable completes after emitting a single item that matches a specified predicate, return an
* Observable that emits that item. If the source Observable emits more than one such item or no such items,
* throw an {@code IllegalArgumentException}.
* throw an {@code NoSuchElementException}.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.p.png">
*
Expand All @@ -6231,8 +6233,9 @@ public final Observable<T> single() {
* @return an Observable that emits the single item emitted by the source Observable that matches the
* predicate
* @throws IllegalArgumentException
* if the source Observable emits either more than one item that matches the predicate or no
* items that match the predicate
* if the source Observable emits more than one item that matches the predicate
* @throws NoSuchElementException
* if the source Observable emits no item that matches the predicate
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-single-and-singleordefault">RxJava Wiki: single()</a>
* @see "MSDN: Observable.singleAsync()"
*/
Expand All @@ -6257,7 +6260,7 @@ public final Observable<T> single(Func1<? super T, Boolean> predicate) {
* @see "MSDN: Observable.singleOrDefaultAsync()"
*/
public final Observable<T> singleOrDefault(T defaultValue) {
return create(OperationSingle.<T> singleOrDefault(this, defaultValue));
return lift(new OperatorSingle<T>(defaultValue));
}

/**
Expand Down
Loading