-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduler fixes and related improvements #1190
Conversation
RxJava-pull-requests #1102 FAILURE |
@@ -28,7 +28,7 @@ | |||
/** | |||
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed. | |||
*/ | |||
public class TrampolineScheduler extends Scheduler { | |||
public final class TrampolineScheduler extends Scheduler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make this package only as well since it is also accessible via Schedulers.trampoline()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. Maybe this PR was not the right place to modify this line as there are other scheduler related PRs doing the same.
Spent the time to review this today ... good set of changes, but I found areas I'd like to discuss with you (some of which existed already). I'll start adding them inline. I was intrigued to see you start using AtomicFieldUpdater. Have you found it makes a difference in object allocations? I would imagine it would in the the high-traffic objects. |
* outer would set the first reference. This implies that calling setNext should happen sequentially | ||
* in respect to each other. | ||
*/ | ||
static final class StampedSubscription implements Subscription { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be made private? If it's intended for package access, perhaps it should live in the package rather than as an inner class? Seems very specific to this though.
What happens if setFirst or setNext are called concurrently?
There is a comment saying "giving up if a newer subscription was already set" so this concerns me with silent failures. Should this throw an exception instead of silently failing if it is ever used incorrectly? Should we just say this is not thread-safe and it only works for this specific use case? I tend to prefer throwing an IllegalStateException or something similar if it's misused rather than silently giving up or failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setFirst and setNext behavior eliminates the race condition when the thread calling the schedulePeriodically gets stalled right after receiving the subscription but before setting it on the container-subscription it is returning and potentially overwriting a subscription token of the next scheduled period. The "giving up" here is of no concern because at the time it would set an old subscription, there is already a new subscription because the first iteration already completed and the old can just be discarded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I sometimes find myself in need of such StampedSubscription while implementing schedulers so I'd put these into a more accessible place, but I didn't want to pollute rx.schedulers
with it and putting it into rx.subscriptions
exposes it due to the need of public modifier.
I'm closing this due to the distance from master and due to the increased need to remove any memory overhead and using lock-free. I'll post a separate PR for just the StampedSubscription to make sure a periodic action can be reliably unsubscribed. |
Are you planning on continuing on evolving this? Do you want to work together on a branch to review and optimize? |
My head is full with branches from all the recent PRs and merges so I need to reaccomodate the master and where it is standing. I think I'll do a complete overhaul, including the scheduler and subscription optimizations; i.e., I'll abandon this specific branch here. |
Keep me in the loop on this. I intend on starting to do profiling so I'd like us to stay in touch so we don't go off on tangents from each other and waste time. |
Perhaps we can start by using #1204 to discuss ... and then link to branches/PRs from there? |
Sure. |
A newer set of scheduler improvements.
schedulePeriodic
to properly capture the Subscription of the repeated schedule calls to allow unsubscribing the periodic task without the need to unsubscribe the whole worker.NewThreadScheduler
to callshutdownNow
to correctly cancel all pending tasks.innerSubscription
fromNewThreadScheduler
because it was unnecessary and didn't even work.CompositeSubscription
from Large CompositeSubscription performance improvements #1145 to improve on the add/removal time of timed tasks with a help of aHashSet
and atomic field updaters.SubscriptionQueue
that works like an ringbuffer and can add/remove non-delayed tasks faster thanCompositeSubscription
.CompositeSubscription
andScheduledAction
.EventLoopScheduler
to track delayed and non-delayed task Subscriptions separately.final
modifiers.