-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Explore Blocking vs Non-Blocking Solutions #1299
Comments
This issue is related to #1204 where |
If anyone wants to offer alternative implementations please ensure that 2 requirements are met (obviously beyond passing all functional unit tests):
|
Here is a resource for us to explore: https://github.com/JCTools/JCTools/tree/master/jctools-core/src/main/java/org/jctools |
There is my hybrid composite in #1145. As for the hang, my guess is the lazySet that may overwrite a terminal state indicator, but it seems its performance isn't good enough anyway. |
A lot has been done and we now have Closing this out as this was a very generic issue and has served its purpose. |
This is to document and explore blocking vs non-blocking solutions for a couple use cases, particularly the
SerializedObserver
andCompositeSubscription
which are hot code paths in most use of Rx.Thus far the
synchronized
blocking implementations have won and are currently being used. Despite usingsynchronized
, the locks are not held while emitting notifications, only for mutating internal data structures.A key consideration is that object allocations must be kept low. The atomic state machine pattern has been attempted on both of these, and is elegant, but was a massive performance problem with
CompositeSubscription
due to some valid edge use cases withmerge
that result in hundreds of subscriptions being added to the data structure viaCompositeSubscription.add
and each time performing a state transition with object allocation.Following are details on the use cases, the current implementation, alternates that have been attempted, and performance results from JMH tests.
The intent of this is to document what has been attempted thus far and seek improvements from anyone who can provide better solutions.
SerializedObserver
Use Case:
Serialize
onNext
/onError
/onCompleted
calls from multiple threads to be sequential, but without synchronizing and blocking threads.Current: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/observers/SerializedObserver.java
Discussions:
Alternate implementations:
Performance tests
Current Code (locks)
MPSC Queue
It completely hung once at this point:
queue and counter
state machine
CompositeSubscription/SubscriptionList
Use Case:
A
Subscription
implementation that allows the following:- add(Subscription) to maintain a list
- allows anybody to register a
Subscription
to be invoked when the parent is unsubscribed- isUnsubscribed
- often checked on every
onNext
in a tight loop- unsubscribe
- can be invoked once and will mark
isUnsubscribed
to true and callunsubscribe
on all subscriptions added viaadd
- remove(Subscription)
- some use cases need the ability to randomly remove a
Subscription
, for example,merge
andflatMap
which usesmerge
This means that the list of subscriptions is multiple writes, a single read at
unsubscribe
at the end. The booleanisUnsubscribed
is ready many times, modified once.Current CompositeSubscription: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java
Current ChainedSubscription: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/subscriptions/ChainedSubscription.java
|-> will be renamed
Discussions:
Alternate implementations:
|-> this implementation has massive object allocation overhead in use cases with high numbers of
add(Subscription)
such asObservable.merge(hundreds-of-observables)
Performance tests
Current Code (locks)
State Machine
MPSC Queue + AtomicInteger
The text was updated successfully, but these errors were encountered: