Skip to content

Commit

Permalink
Operator OnExceptionResumeNextViaObservable
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Apr 28, 2014
1 parent 95e0636 commit cd04714
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 131 deletions.
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import rx.operators.OperationMulticast;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallelMerge;
import rx.operators.OperationReplay;
import rx.operators.OperationSample;
Expand Down Expand Up @@ -108,6 +107,7 @@
import rx.operators.OperatorObserveOn;
import rx.operators.OperatorOnErrorFlatMap;
import rx.operators.OperatorOnErrorResumeNextViaFunction;
import rx.operators.OperatorOnExceptionResumeNextViaObservable;
import rx.operators.OperatorParallel;
import rx.operators.OperatorPivot;
import rx.operators.OperatorRepeat;
Expand Down Expand Up @@ -4592,7 +4592,7 @@ public final Observable<T> onErrorFlatMap(final Func1<OnErrorThrowable, ? extend
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-onexceptionresumenext">RxJava Wiki: onExceptionResumeNext()</a>
*/
public final Observable<T> onExceptionResumeNext(final Observable<? extends T> resumeSequence) {
return create(OperationOnExceptionResumeNextViaObservable.onExceptionResumeNextViaObservable(this, resumeSequence));
return lift(new OperatorOnExceptionResumeNextViaObservable<T>(resumeSequence));
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.operators;

import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;

/**
* Instruct an Observable to pass control to another Observable rather than invoking
* <code>onError</code> if it encounters an error of type {@link java.lang.Exception}.
* <p>
* This differs from {@link Observable#onErrorResumeNext} in that this one does not handle
* {@link java.lang.Throwable} or {@link java.lang.Error} but lets those continue through.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/onErrorResumeNext.png">
* <p>
* By default, when an Observable encounters an error that prevents it from emitting the expected
* item to its Observer, the Observable invokes its Observer's <code>onError</code> method, and
* then quits without invoking any more of its Observer's methods. The onErrorResumeNext operation
* changes this behavior. If you pass an Observable (resumeSequence) to onErrorResumeNext, if the
* source Observable encounters an error, instead of invoking its Observer's <code>onError</code>
* method, it will instead relinquish control to this new Observable, which will invoke the
* Observer's <code>onNext</code> method if it is able to do so. In such a case, because no
* Observable necessarily invokes <code>onError</code>, the Observer may never know that an error
* happened.
* <p>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
*
* @param <T> the value type
*/
public final class OperatorOnExceptionResumeNextViaObservable<T> implements Operator<T, T> {
final Observable<? extends T> resumeSequence;

public OperatorOnExceptionResumeNextViaObservable(Observable<? extends T> resumeSequence) {
this.resumeSequence = resumeSequence;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
// needs to independently unsubscribe so child can continue with the resume
Subscriber<T> s = new Subscriber<T>() {

@Override
public void onNext(T t) {
child.onNext(t);
}

@Override
public void onError(Throwable e) {
if (e instanceof Exception) {
unsubscribe();
resumeSequence.unsafeSubscribe(child);
} else {
child.onError(e);
}
}

@Override
public void onCompleted() {
child.onCompleted();
}

};
child.add(s);

return s;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static rx.operators.OperationOnExceptionResumeNextViaObservable.onExceptionResumeNextViaObservable;

import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -32,7 +31,7 @@
import rx.Subscription;
import rx.functions.Func1;

public class OperationOnExceptionResumeNextViaObservableTest {
public class OperatorOnExceptionResumeNextViaObservableTest {

@Test
public void testResumeNextWithException() {
Expand All @@ -41,7 +40,7 @@ public void testResumeNextWithException() {
TestObservable f = new TestObservable(s, "one", "EXCEPTION", "two", "three");
Observable<String> w = Observable.create(f);
Observable<String> resume = Observable.from("twoResume", "threeResume");
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
Observable<String> observable = w.onExceptionResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down Expand Up @@ -70,7 +69,7 @@ public void testResumeNextWithRuntimeException() {
TestObservable f = new TestObservable(s, "one", "RUNTIMEEXCEPTION", "two", "three");
Observable<String> w = Observable.create(f);
Observable<String> resume = Observable.from("twoResume", "threeResume");
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
Observable<String> observable = w.onExceptionResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down Expand Up @@ -99,7 +98,7 @@ public void testThrowablePassesThru() {
TestObservable f = new TestObservable(s, "one", "THROWABLE", "two", "three");
Observable<String> w = Observable.create(f);
Observable<String> resume = Observable.from("twoResume", "threeResume");
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
Observable<String> observable = w.onExceptionResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down Expand Up @@ -128,7 +127,7 @@ public void testErrorPassesThru() {
TestObservable f = new TestObservable(s, "one", "ERROR", "two", "three");
Observable<String> w = Observable.create(f);
Observable<String> resume = Observable.from("twoResume", "threeResume");
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
Observable<String> observable = w.onExceptionResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down Expand Up @@ -162,6 +161,7 @@ public void testMapResumeAsyncNext() {
// Introduce map function that fails intermittently (Map does not prevent this when the observer is a
// rx.operator incl onErrorResumeNextViaObservable)
w = w.map(new Func1<String, String>() {
@Override
public String call(String s) {
if ("fail".equals(s))
throw new RuntimeException("Forced Failure");
Expand All @@ -170,7 +170,7 @@ public String call(String s) {
}
});

Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
Observable<String> observable = w.onExceptionResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down

0 comments on commit cd04714

Please sign in to comment.