Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator Window #1123

Closed
wants to merge 1 commit into from
Closed

Operator Window #1123

wants to merge 1 commit into from

Conversation

akarnokd
Copy link
Member

Operator Window

Issue #1060

Significant changes have been implemented in the new Window operator's behavior:

  • The original sized, timed and start-end observable versions waited until a window has finished and emitted the buffered values as a single Observable.from. This is not how the Rx.NET version behaves where once a window is open, values are delivered as they appear from source. This has the same effects as groupBy where Observable windows not subscribed immediately will not deliver all values. Probably this is why Clojure test test-partition-all fails, but I don't know how to fix it there.
  • The callback boundary-observable version, similar to the buffer variant before, constantly created a new observable for each value. This new version, consistent with the new buffer, uses one boundary source per subscribing client.
  • The exact variants (such as size == skip, timespan == timeshift and observable boundary) make extra effort to ensure all events are delivered into one of the windows, i.e., no event will fall between the closing and opening of a window gap when it concurrently appears from the source.

This rewrite needs more meticulous review due to the changes and their concurrency effects.

@akarnokd akarnokd mentioned this pull request Apr 28, 2014
57 tasks
@cloudbees-pull-request-builder

RxJava-pull-requests #1035 ABORTED

@akarnokd
Copy link
Member Author

Not sure why it aborts, locally, it fails on the clojure test

FAIL in (test-partition-all) (core_test.clj:470)
expected: (= (->> (range 15) (partition-all 30 4)) (->> (range 15) (rx/seq->o) (rx/partition-all 30 4) (rx/map (fn* [p1__841#] (rx/into [] p1__841#))) (rx/concat*) (b/into [])))
  actual: (not (= ((0 1 2 3 4 5 6 7 8 9 10 11 12 13 14) (4 5 6 7 8 9 10 11 12 13 14) (8 9 10 11 12 13 14) (12 13 14)) [[0 1 2 3 4 5 6 7 8 9 10 11 12 13 14] [] [] []]))

@benjchristensen
Copy link
Member

Observable windows not subscribed immediately will not deliver all values

Perhaps then it needs to use BufferUntilSubscriber like groupBy does?

@akarnokd
Copy link
Member Author

That can be arranged, but the current BufferUntilSubscriber looks strange. I haven't tested it, but it throws ClassCastException if a second subscriber subscribes after the pass-through mode. Plus, it allows only a single actual subscriber, so a second subscriber would be lost or would overpower the first.

@benjchristensen
Copy link
Member

it throws ClassCastException if a second subscriber subscribes after the pass-through mode

It is odd that it throws a ClassCastException. I can fix that. It's because it expects single subscription as this is not multicast. The error it should throw is to tell someone to use multicast/publish/replay/etc if they want to multicast it.

This whole type is odd, because we are bridging a time boundary and avoiding the use of a multicast subject on a hot observable that we have created from a potentially cold observable.

We do not add multicast anywhere unless the user asks for it, and probably shouldn't here either. I'm not sure the overhead of SubjectSubscriptionManager to allow multicasting on something that is not ever supposed to be used for multicasting.

Do you think we should support multicast on GroupedObservables that we create and emit from groupBy? If so, why?

@akarnokd
Copy link
Member Author

In PR #1138, I've fixed BufferUntilSubscriber to behave as a simple PublishSubject for the subsequent subscribers. It works and most users compose operations that subscribe once, but it is not a least surprising behavior. I'd add a replayAll and replayOnce operators to GS so users have to explicitly request that behavior. Both would immediately subscribe to the group and route everything through ReplaySubject or BufferUntilSubscriber, which I would rather rename to ReplayOnceSubject.

@akarnokd akarnokd closed this Apr 30, 2014
@benjchristensen
Copy link
Member

More clarity on the multicast issue ... if we were to start allowing multicast on a GroupedObservable or similar use cases, we would just end up recreating the "time gap" issue, as the first subscriber would drain the queue and subsequent subscribers only get items emitted from then onwards - not at all what anyone would expect.

If someone wants to multicast they should have to explicitly ask and deal with the ConnectableObservable.connect() behavior and decision of when to subscribe (and thus drain the queue in this case).

@benjchristensen
Copy link
Member

I've fixed BufferUntilSubscriber to behave as a simple PublishSubject for the subsequent subscribers

No, that's not how it should work. I specifically implemented BufferUntilSubscriber without PublishSubject and ReplaySubject behaviors because it's wrong, as my previous comment states.

It should not do multicast. It is single-purposed for hopping the time gap, and that's it.

@akarnokd akarnokd deleted the OperatorWindow branch May 6, 2014 13:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants