Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator OnExceptionResumeNextViaObservable #1117

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import rx.operators.OperationMulticast;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallelMerge;
import rx.operators.OperationReplay;
import rx.operators.OperationSample;
Expand Down Expand Up @@ -108,6 +107,7 @@
import rx.operators.OperatorObserveOn;
import rx.operators.OperatorOnErrorFlatMap;
import rx.operators.OperatorOnErrorResumeNextViaFunction;
import rx.operators.OperatorOnExceptionResumeNextViaObservable;
import rx.operators.OperatorParallel;
import rx.operators.OperatorPivot;
import rx.operators.OperatorRepeat;
Expand Down Expand Up @@ -4592,7 +4592,7 @@ public final Observable<T> onErrorFlatMap(final Func1<OnErrorThrowable, ? extend
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-onexceptionresumenext">RxJava Wiki: onExceptionResumeNext()</a>
*/
public final Observable<T> onExceptionResumeNext(final Observable<? extends T> resumeSequence) {
return create(OperationOnExceptionResumeNextViaObservable.onExceptionResumeNextViaObservable(this, resumeSequence));
return lift(new OperatorOnExceptionResumeNextViaObservable<T>(resumeSequence));
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.operators;

import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;

/**
* Instruct an Observable to pass control to another Observable rather than invoking
* <code>onError</code> if it encounters an error of type {@link java.lang.Exception}.
* <p>
* This differs from {@link Observable#onErrorResumeNext} in that this one does not handle
* {@link java.lang.Throwable} or {@link java.lang.Error} but lets those continue through.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/onErrorResumeNext.png">
* <p>
* By default, when an Observable encounters an error that prevents it from emitting the expected
* item to its Observer, the Observable invokes its Observer's <code>onError</code> method, and
* then quits without invoking any more of its Observer's methods. The onErrorResumeNext operation
* changes this behavior. If you pass an Observable (resumeSequence) to onErrorResumeNext, if the
* source Observable encounters an error, instead of invoking its Observer's <code>onError</code>
* method, it will instead relinquish control to this new Observable, which will invoke the
* Observer's <code>onNext</code> method if it is able to do so. In such a case, because no
* Observable necessarily invokes <code>onError</code>, the Observer may never know that an error
* happened.
* <p>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
*
* @param <T> the value type
*/
public final class OperatorOnExceptionResumeNextViaObservable<T> implements Operator<T, T> {
final Observable<? extends T> resumeSequence;

public OperatorOnExceptionResumeNextViaObservable(Observable<? extends T> resumeSequence) {
this.resumeSequence = resumeSequence;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
// needs to independently unsubscribe so child can continue with the resume
Subscriber<T> s = new Subscriber<T>() {

@Override
public void onNext(T t) {
child.onNext(t);
}

@Override
public void onError(Throwable e) {
if (e instanceof Exception) {
unsubscribe();
resumeSequence.unsafeSubscribe(child);
} else {
child.onError(e);
}
}

@Override
public void onCompleted() {
child.onCompleted();
}

};
child.add(s);

return s;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static rx.operators.OperationOnExceptionResumeNextViaObservable.onExceptionResumeNextViaObservable;

import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -32,7 +31,7 @@
import rx.Subscription;
import rx.functions.Func1;

public class OperationOnExceptionResumeNextViaObservableTest {
public class OperatorOnExceptionResumeNextViaObservableTest {

@Test
public void testResumeNextWithException() {
Expand All @@ -41,7 +40,7 @@ public void testResumeNextWithException() {
TestObservable f = new TestObservable(s, "one", "EXCEPTION", "two", "three");
Observable<String> w = Observable.create(f);
Observable<String> resume = Observable.from("twoResume", "threeResume");
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
Observable<String> observable = w.onExceptionResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down Expand Up @@ -70,7 +69,7 @@ public void testResumeNextWithRuntimeException() {
TestObservable f = new TestObservable(s, "one", "RUNTIMEEXCEPTION", "two", "three");
Observable<String> w = Observable.create(f);
Observable<String> resume = Observable.from("twoResume", "threeResume");
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
Observable<String> observable = w.onExceptionResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down Expand Up @@ -99,7 +98,7 @@ public void testThrowablePassesThru() {
TestObservable f = new TestObservable(s, "one", "THROWABLE", "two", "three");
Observable<String> w = Observable.create(f);
Observable<String> resume = Observable.from("twoResume", "threeResume");
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
Observable<String> observable = w.onExceptionResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down Expand Up @@ -128,7 +127,7 @@ public void testErrorPassesThru() {
TestObservable f = new TestObservable(s, "one", "ERROR", "two", "three");
Observable<String> w = Observable.create(f);
Observable<String> resume = Observable.from("twoResume", "threeResume");
Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
Observable<String> observable = w.onExceptionResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down Expand Up @@ -162,6 +161,7 @@ public void testMapResumeAsyncNext() {
// Introduce map function that fails intermittently (Map does not prevent this when the observer is a
// rx.operator incl onErrorResumeNextViaObservable)
w = w.map(new Func1<String, String>() {
@Override
public String call(String s) {
if ("fail".equals(s))
throw new RuntimeException("Forced Failure");
Expand All @@ -170,7 +170,7 @@ public String call(String s) {
}
});

Observable<String> observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
Observable<String> observable = w.onExceptionResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down