-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Error Handling Operators
There are a variety of operators that you can use to react to or recover from onError
notifications from Observables. For example, you might:
- swallow the error and switch over to a backup Observable to continue the sequence
- swallow the error and emit a default item
- swallow the error and immediately try to restart the failed Observable
- swallow the error and try to restart the failed Observable after some back-off interval
This section explains these operators.
-
onErrorResumeNext( )
— instructs an Observable to emit a sequence of items if it encounters an error -
onErrorReturn( )
— instructs an Observable to emit a particular item when it encounters an error -
onExceptionResumeNext( )
— instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable) -
retry( )
— if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error -
retryWhen( )
— if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source
The onErrorResumeNext( )
method returns an Observable that mirrors the behavior of the source Observable, unless that Observable invokes onError( )
in which case, rather than propagating that error to the Subscriber, onErrorResumeNext( )
will instead begin mirroring a second, backup Observable, as shown in the following sample code:
def myObservable = Observable.create({ aSubscriber ->
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Three');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Two');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('One');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onError();
});
def myFallback = Observable.create({ aSubscriber ->
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('0');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('1');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('2');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onCompleted();
});
myObservable.onErrorResumeNext(myFallback).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
Three
Two
One
0
1
2
Sequence complete
- javadoc:
onErrorResumeNext(throwable,function)
- javadoc:
onErrorResumeNext(sequence)
- RxJS:
onErrorResumeNext
- Linq:
OnErrorResumeNext
- Introduction to Rx: OnErrorResumeNext
instructs an Observable to emit a particular item to a Subscriber’s onNext method when it encounters an error
The onErrorReturn( )
method returns an Observable that mirrors the behavior of the source Observable, unless that Observable invokes onError( )
in which case, rather than propagating that error to the Subscriber, onErrorReturn( )
will instead emit a specified item and invoke the Subscriber's onCompleted( )
method, as shown in the following sample code:
def myObservable = Observable.create({ aSubscriber ->
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Four');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Three');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Two');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('One');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onError();
});
myObservable.onErrorReturn({ return('Blastoff!'); }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
Four
Three
Two
One
Blastoff!
Sequence complete
- javadoc:
onErrorReturn(func)
instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
Much like onErrorResumeNext( )
method, this returns an Observable that mirrors the behavior of the source Observable, unless that Observable invokes onError( )
in which case, if the Throwable
passed to onError( )
is an Exception
, rather than propagating that Exception
to the Subscriber, onExceptionResumeNext( )
will instead begin mirroring a second, backup Observable. If the Throwable
is not an Exception
, the Observable returned by onExceptionResumeNext( )
will propagate it to its Subscriber's onError( )
method and will not invoke its backup Observable.
- javadoc:
onExceptionResumeNext(observable)
if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error
The retry( )
method responds to an onError( )
call from the source Observable by not passing that call through to its Subscribers, but instead resubscribing to the source Observable and giving it another opportunity to complete its sequence without error. You can pass retry( )
a maximum number of retry-attempts, or you can pass nothing, in which case it will never stop retrying to get an error-free sequence. It always passes onNext( )
calls through to its Subscribers, even from sequences that terminate with an error, so this can cause duplicate emissions (as shown in the diagram above).
- javadoc:
retry()
,retry(count)
, andretry(rx.functions.Func1)
- RxJS:
retry
- Linq:
Retry
- Introduction to Rx: Retry
if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source
The retryWhen( )
operator is similar to retry( )
but decides whether or not to resubscribe to the source Observable and remirror its emissions by passing the onError
notification (converted into a Notification
item) to a second Observable, and observing its result. If that result is an emitted item, retryWhen( )
resubscribes to the source and the process repeats; if that result is an onError
notification, retryWhen( )
also completes.
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava