Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concat + concurrency + multiple subscribers #552

Closed
samuelgruetter opened this issue Dec 3, 2013 · 2 comments
Closed

concat + concurrency + multiple subscribers #552

samuelgruetter opened this issue Dec 3, 2013 · 2 comments

Comments

@samuelgruetter
Copy link
Contributor

This C# snippet

var o = Observable.Concat(
    Observable.Interval(TimeSpan.FromMilliseconds(500)).Take(2),
    Observable.Interval(TimeSpan.FromMilliseconds(500)).Take(2)
);
o.Subscribe(x => Console.WriteLine("a: " + x), t => Console.WriteLine(t), () => Console.WriteLine("a: done"));
o.Subscribe(x => Console.WriteLine("b: " + x), t => Console.WriteLine(t), () => Console.WriteLine("b: done"));
Console.WriteLine("all subscribed");
Thread.Sleep(5000);

outputs, as expected, this:

all subscribed
b: 0
a: 0
b: 1
a: 1
a: 0
b: 0
a: 1
a: done
b: 1
b: done

The same translated to Java:

Action1<Long> printNext(final String who) {
    return new Action1<Long>() {
        public void call(Long o) {
            System.out.println(who + o);
        }
    };
}
Action1<Throwable> printErr(final String who) {
    return new Action1<Throwable>() {
        public void call(Throwable o) {
            System.out.println(who + o);
        }
    };
}
Action0 printComplete(final String who) {
    return new Action0() {
        public void call() {
            System.out.println(who + "done");
        }
    };
}
@Test public void testConcat2() throws Exception {
    Observable<Long> o = Observable.concat(
        Observable.interval(500, TimeUnit.MILLISECONDS).take(2),
        Observable.interval(500, TimeUnit.MILLISECONDS).take(2)
    );        
    o.subscribe(printNext("a: "), printErr("a: "), printComplete("a: "));
    o.subscribe(printNext("b: "), printErr("b: "), printComplete("b: "));
    System.out.println("all subscribed");
    Thread.sleep(5000);
}

only outputs this:

all subscribed
a: 0
a: 1
a: 0
a: 1
a: done

If I replace the Observable.interval by an Observable.from, the problem disappears, so I think the problem only shows up if we have concat + concurrency + multiple subscribers.

@akarnokd
Copy link
Member

akarnokd commented Dec 8, 2013

The issue is in the Concat operation sharing its innerSubscription among observers. I'm about to fix that.

@benjchristensen
Copy link
Member

Fixed in #586

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants