Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is Observable.onCompleted getting called prematurely? #1633

Closed
girishkolantra opened this issue Aug 27, 2014 · 17 comments
Closed

Is Observable.onCompleted getting called prematurely? #1633

girishkolantra opened this issue Aug 27, 2014 · 17 comments
Milestone

Comments

@girishkolantra
Copy link

I have a sample code where I have an Observable that has onErrorReturn to handle exceptions. When the core processing throws no exception all my items are getting processed, but when there are exceptions thrown, there is some issue where all my items are not getting processed.

The test code is as follows (this version throws exceptions)

    import java.util.ArrayList;
    import java.util.List;
    import rx.Observable;
    import rx.Observer;
    import rx.functions.Func1;
    import rx.schedulers.Schedulers;


    public class ErrorsTest {
        public static void main(String[] args) throws Exception {
            final List<String> processedStrings = new ArrayList<>();

              Observable.range(1, 5)
                .flatMap(new Func1<Integer, Observable<String>> () {

                    @Override
                    public Observable<String> call(Integer integer) {
                        System.out.println(Thread.currentThread().getName() + " making 5 string out of " + integer);
                        String[] strings = new String[5];

                        for(int i = 0;  i < strings.length; i++) {

                            strings[i] = integer + "_" + (i + 1);
                        }

                        return Observable.from(strings);

                    }

            }).parallel(new Func1<Observable<String>, Observable<String>>() {

                @Override
                public Observable<String> call(Observable<String> string) {
                    return string.map(new Func1<String, String> () {

                        @Override
                        public String call(String string) {
                            // remove this if block and the issue goes away
                            if (string.startsWith("1")) {
                                RuntimeException e = new RuntimeException("cannot process " +  string);
                                throw e;
                            }
                            System.out.println(Thread.currentThread().getName() + " processing " + string);
                            return string + "_PROCESSED";
                        }

                    }).onErrorReturn(new Func1<Throwable, String> () {

                        @Override
                        public String call(Throwable t) {
                            System.out.println(Thread.currentThread().getName() + " handling error " + t.getMessage());
                            return "ERROR";
                        }
                    });
                }
            })
             .subscribeOn(Schedulers.computation())
             .subscribe(new Observer<String> () {

                @Override
                public void onCompleted() {
                    System.out.println("obtained " + processedStrings.size());
                    System.out.println(processedStrings);
                }

                @Override
                public void onError(Throwable t) {
                    System.out.println(Thread.currentThread().getName() + " error " + t.getMessage());
                }

                @Override
                public void onNext(String t) {
                    processedStrings.add(t);
                }
            });

            System.in.read();
        }

        private static String asString(String[] strings) {
            return String.format("{%s, %s, %s, %s, %s}", strings[0], strings[1], strings[2], strings[3], strings[4]);
        }

    }
@girishkolantra
Copy link
Author

To elaborate more on the issue, there is some processing going on even after the onCompleted method is called. This happens only when exceptions are thrown during the processing.

@benjchristensen
Copy link
Member

I haven't had time to look at this yet, but in an onError situation it's like throwing an exception and unwinding the stack. Resources and threads can still be in flight while onError is propagating and then onError will trigger unsubscribe to be invoked which will ask things to shut down which can happen asynchronously.

@zsxwing
Copy link
Member

zsxwing commented Sep 1, 2014

@girishkolantra I ran your codes and got the following results:

obtained 14
[2_1_PROCESSED, 2_3_PROCESSED, 2_2_PROCESSED, 3_4_PROCESSED, 3_5_PROCESSED, 4_1_PROCESSED, 5_2_PROCESSED, 5_3_PROCESSED, 5_4_PROCESSED, ERROR, ERROR, ERROR, ERROR, ERROR]

It's expected.
scheduler.parallelism() is 8 in my environment. Therefore, parallel will dispatch the inputs to 8 Observables. The following table shows the contents of these Observables. Each column maps to each Observable.

o1 o2 o3 o4 o5 o6 o7 o8
1_1 1_2 1_3 1_4 1_5 2_1 2_2 2_3
2_4 2_5 3_1 3_2 3_3 3_4 3_5 4_1
4_2 4_3 4_4 4_5 5_1 5_2 5_3 5_4
5_5

According to the following codes,

return string.map(new Func1<String, String> () {

                        @Override
                        public String call(String string) {
                            // remove this if block and the issue goes away
                            if (string.startsWith("1")) {
                                RuntimeException e = new RuntimeException("cannot process " +  string);
                                throw e;
                            }
                            System.out.println(Thread.currentThread().getName() + " processing " + string);
                            return string + "_PROCESSED";
                        }

                    }).onErrorReturn(new Func1<Throwable, String> () {

                        @Override
                        public String call(Throwable t) {
                            System.out.println(Thread.currentThread().getName() + " handling error " + t.getMessage());
                            return "ERROR";
                        }
                    });

o1, o2, o3, o4 and o5 will be converted to an simple Observable with "ERROR", so in my environment I got the following results (the order is non-deterministic):

[2_1_PROCESSED, 2_3_PROCESSED, 2_2_PROCESSED, 3_4_PROCESSED, 3_5_PROCESSED, 4_1_PROCESSED, 5_2_PROCESSED, 5_3_PROCESSED, 5_4_PROCESSED, ERROR, ERROR, ERROR, ERROR, ERROR]

@girishkolantra
Copy link
Author

I was expecting 25 results, with five of them errors as they start with 1. In your example I don't see 2_4 and 2_5 in the processed results. They don't start with 1. If you look closely, 2_4 and 2_5 would have been processed after the on complete is called. That is what I am confused about.

@girishkolantra
Copy link
Author

@benjchristensen If I had not used an onErrorReturn in my code then i would have expected the behavior you suggested. But on adding the onErrorReturn my expectation was that the items that had errors would be processed to errors and all the 'good' items would be processed to completion. But the result was not what I expected.onErrorReturn was correctly handling the errors but I am missing the good results from items which don't have errors. These good items are completing their processing after the onCompleted is called.

@benjchristensen
Copy link
Member

An error being emitted here indicates a problem: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/util/RxRingBuffer.java#L220

I need to spend more time digging.

@zsxwing
Copy link
Member

zsxwing commented Sep 2, 2014

the items that had errors would be processed to errors and all the 'good' items would be processed to completion.

Take a look at here: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java#L61
onErrorReturn just replaces the onError message with an onNext and an onCompleted. It does not resume the source Observable.
Perhaps you want onErrorFlatMap, but it will be removed in 1.0.

@zsxwing
Copy link
Member

zsxwing commented Sep 2, 2014

If you look closely, 2_4 and 2_5 would have been processed after the on complete is called. That is what I am confused about.

I think it's a bug... just dig into the codes. Looks it's becauseunsubscribe and emit onNext are asynchronous. SafeSubscriber will handle it. But functions in Observable may observe this behavior.

@girishkolantra
Copy link
Author

@zsxwing
I get it from the documentation that onError return calls onCompleted right after returning the replacement element. This would mean that my observable that was supposed to 1_1, 1_2, 1_3, 1_4 and 1_5 (created using Observale.from(strings)) would have returned error for the first element 1_1 and then completed. But I get 5 errors.

Also,replacing Observable.range(1,5) with

 Observable.create(new Observable.OnSubscribe<Integer>() {

        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onNext(3);
            subscriber.onNext(4);
            subscriber.onNext(5);
            sleep();
            subscriber.onCompleted();
        }

    })

where sleep causes a 5 second sleep gives the result that I am expecting, I get 25 elements in the result, 5 of them errors. why does every thing work as expected when I just put a delay before calling the subscriber.onCompleted()?

@zsxwing
Copy link
Member

zsxwing commented Sep 2, 2014

This would mean that my observable that was supposed to 1_1, 1_2, 1_3, 1_4 and 1_5 (created using Observale.from(strings)) would have returned error for the first element 1_1 and then completed. But I get 5 errors.

It's expected. parallel will split the source Observable into scheduler.parallelism() Observables. 1_1, 1_2, 1_3, 1_4 and 1_5 will be assigned to 5 different Observables. That's why you got 5 errors.

@zsxwing
Copy link
Member

zsxwing commented Sep 2, 2014

where sleep causes a 5 second sleep gives the result that I am expecting, I get 25 elements in the result, 5 of them errors. why does every thing work as expected when I just put a delay before calling the subscriber.onCompleted()?

This is a bug in onErrorReturn. I sent a PR to fix it: #1657

@zsxwing
Copy link
Member

zsxwing commented Sep 3, 2014

@girishkolantra Usually, you should check isUnsubscribed in Observable. This one:

Observable.create(new Observable.OnSubscribe<Integer>() {

        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onNext(3);
            subscriber.onNext(4);
            subscriber.onNext(5);
            sleep();
            subscriber.onCompleted();
        }

    })

should rewrite to

Observable.create(new Observable.OnSubscribe<Integer>() {

        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            if(!subscriber.isUnsubscribed()) subscriber.onNext(1);
            if(!subscriber.isUnsubscribed()) subscriber.onNext(2);
            if(!subscriber.isUnsubscribed()) subscriber.onNext(3);
            if(!subscriber.isUnsubscribed()) subscriber.onNext(4);
            if(!subscriber.isUnsubscribed()) subscriber.onNext(5);
            sleep();
            if(!subscriber.isUnsubscribed()) subscriber.onCompleted();
        }

    })

Usually, I don't add isUnsubscribed in a Subscriber unless I'm implementing an operator.

@benjchristensen
Copy link
Member

The error handlers have been improved to be more strict on not allowing events after a terminal state, however, it does not solve the fact that this use of parallel is awkward.

Doing error handling inside the parallel operator doesn't make much sense, as by definition a single-stream is being split into many, so each of those inner children having errors handled individually doesn't make much sense. The onErrorReturn should really be on the outside, after the parallel operator so it correctly shuts down the entire sequence.

I think a more important design decision for us to make is if we should shut down (unsubscribe) the entire parallel operator if one of the children fails (as in this case) or let it continue to behave this way. Or should we start up another processing Observable after a child dies like this.

The odd behavior one gets with this is as follows:

  • parallel spins up n threads
  • an error occurs on 1 thread (Observable) and terminates it, but the other n-1 continue
  • another error occurs, now n-2 threads continue
  • errors keeping happening until n-n

It seems that once an error occurs either the whole thing should be shut down, or the thread should be replaced with a new one if it is handled with an onError handler.

@benjchristensen benjchristensen added this to the 1.0 milestone Sep 5, 2014
@girishkolantra
Copy link
Author

@benjchristensen Why do you say that the onErrorReturn should be outside, after the parallel. I was thinking in terms of an example use case where say you have a list of items for which you want to execute the same operation on each of the item but the success or failure of the operation should not affect the overall orchestration. Say for example for a person you get a list of newsletters and you want to call a service in parallel with the newsletter ID to start subscribing but not being able to subscribe should not cause the processing to halt

@benjchristensen
Copy link
Member

Then parallel is the wrong operator for the use case. The parallel operator is splitting a stream up by modding the number of CPUs. The whole thing is a unit of work.

If you want each item treated independently, flatMap over the sequence and allow each inner Observable to behave on its own:

newsletters.flatMap(letter -> {
   return subscribeToNewsletter(letter)
             .onErrorReturn(fallback);
   });

If the subscribeToNewsletter is not already async, then it can be made async like this:

newsletters.flatMap(letter -> {
   return subscribeToNewsletter(letter)
             .subscribeOn(Schedulers.io())
             .onErrorReturn(fallback);
   });

@girishkolantra
Copy link
Author

@benjchristensen Thanks for taking the time to answer my question. That has lit up a lot of light bulbs in my head :-)

@benjchristensen
Copy link
Member

No problem!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants