-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Is Observable.onCompleted getting called prematurely? #1633
Comments
To elaborate more on the issue, there is some processing going on even after the onCompleted method is called. This happens only when exceptions are thrown during the processing. |
I haven't had time to look at this yet, but in an |
@girishkolantra I ran your codes and got the following results:
It's expected.
According to the following codes,
o1, o2, o3, o4 and o5 will be converted to an simple Observable with "ERROR", so in my environment I got the following results (the order is non-deterministic):
|
I was expecting 25 results, with five of them errors as they start with 1. In your example I don't see 2_4 and 2_5 in the processed results. They don't start with 1. If you look closely, 2_4 and 2_5 would have been processed after the on complete is called. That is what I am confused about. |
@benjchristensen If I had not used an onErrorReturn in my code then i would have expected the behavior you suggested. But on adding the onErrorReturn my expectation was that the items that had errors would be processed to errors and all the 'good' items would be processed to completion. But the result was not what I expected.onErrorReturn was correctly handling the errors but I am missing the good results from items which don't have errors. These good items are completing their processing after the onCompleted is called. |
An error being emitted here indicates a problem: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/util/RxRingBuffer.java#L220 I need to spend more time digging. |
Take a look at here: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java#L61 |
|
@zsxwing Also,replacing Observable.range(1,5) with
where sleep causes a 5 second sleep gives the result that I am expecting, I get 25 elements in the result, 5 of them errors. why does every thing work as expected when I just put a delay before calling the subscriber.onCompleted()? |
It's expected. |
This is a bug in |
@girishkolantra Usually, you should check
should rewrite to
Usually, I don't add |
The error handlers have been improved to be more strict on not allowing events after a terminal state, however, it does not solve the fact that this use of Doing error handling inside the I think a more important design decision for us to make is if we should shut down (unsubscribe) the entire The odd behavior one gets with this is as follows:
It seems that once an error occurs either the whole thing should be shut down, or the thread should be replaced with a new one if it is handled with an |
@benjchristensen Why do you say that the onErrorReturn should be outside, after the parallel. I was thinking in terms of an example use case where say you have a list of items for which you want to execute the same operation on each of the item but the success or failure of the operation should not affect the overall orchestration. Say for example for a person you get a list of newsletters and you want to call a service in parallel with the newsletter ID to start subscribing but not being able to subscribe should not cause the processing to halt |
Then If you want each item treated independently, flatMap over the sequence and allow each inner Observable to behave on its own: newsletters.flatMap(letter -> {
return subscribeToNewsletter(letter)
.onErrorReturn(fallback);
}); If the newsletters.flatMap(letter -> {
return subscribeToNewsletter(letter)
.subscribeOn(Schedulers.io())
.onErrorReturn(fallback);
}); |
@benjchristensen Thanks for taking the time to answer my question. That has lit up a lot of light bulbs in my head :-) |
No problem! |
I have a sample code where I have an Observable that has onErrorReturn to handle exceptions. When the core processing throws no exception all my items are getting processed, but when there are exceptions thrown, there is some issue where all my items are not getting processed.
The test code is as follows (this version throws exceptions)
The text was updated successfully, but these errors were encountered: