-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Messages priority & inbox unification #85
Conversation
f0336f2
to
9b018fd
Compare
Optimised for the case that most priorities are the same - in this way it is optimistic that the user is not mixing and matching priorities a huge amount. I'm not sure if this is actually better in a real world case, but it should beat std's binary heap for all *existing* Address patterns, as they all use effectively the same priority. This could change in future, though, so std's might actually be better in those cases. This brings us down from 320ns+ to 200ns
Interesting. I've also considered suggesting the idea of message priorities to replace the concept of "self notifications" but I wasn't sure if this is a feature worth having. I am imaging that it could be confusing for users if they have to choose a priority so I'd vote for a good default here (probably lowest priority). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this might also completely replace #51?
Very interesting work, I'll tag along :)
The plan would be either to rebase #51 on this, or do essentially what #51 does, since the API is good, but based on the master after this is merged. It might just be more work to get #51 rebased, though, so my plan is to try get this in and then evaluate. |
As far as default priorities go, I am planning to add a separate queue for default priority messages, as binary heaps are not stable with insertion order, and this is important for regular, non-priority based messages |
Default queue preserves order unlike the binary heap does. Plus, my implementation of the binary heap was subtly wrong. It would have been fixable, but not really worth it, considering that now that default queue exists, most priority messages probably won't have the same priority (or will be some infrequent that it doesn't matter), so std's strategy will probably beat it out anyway.
This helps handle its drop better (future change)
This solves an edge case
This should help reduce some out of order receiving when used properly. Theoretically, if `cancel` is used prior to cancelling a `ReceiveFuture`, then out of order messages should never happen. I only expect this to actually become a problem when custom event loops are implemented, or with `select`/`join` being used
Also added example to join and refactored it to fix inbox-refactor changes well # Conflicts: # Cargo.toml # src/address.rs # src/context.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've given this a (mostly stylistic) review, take as you wish :)
I think I understand the overall design and agree with your conclusion that a specialised impl is better than composing two libraries.
One question: Do you think it would make sense to try making a design that is based onstd::task::Waker
for waking the correct task to pop the next message?
Instead of remembering the sender/receiver we need to fulfill, storing a waker from the async context when the particular sender/receiver tried to read/write a message could be used to wake exactly that task to try its operation again (which should succeed then).
I've seen such a design in libp2p-mplex
to implement a multiplexer on top of TCP with individual substreams.
I am thinking that such a waker based approach could be simpler because we might not need the "try fulfilling" logic at the time of sending a message but instead just need to consume the waker.
Thoughts?
As I understand it, this is basically what we do. The difference is a slight implementation thing, I think. Try fulfill can fail in the following ways:
2 & 3 essentially only exist to move the cleanup of deregisteted waiters to the send rather than receive - this way, drop doesn't need to iterate through all waiters to see which it is. The caller of try fulfill does not observe failures of 2&3 since it will retry until the first failure case is hit, which cannot be helped. Otherwise, is the difference that this solution skips the receiver or sender having to lock the channel again after waking to compete its operation? I'll elaborate further if this is what you are referring to when I'm back at my PC. Can you elaborate more on how the other solution would differ? |
It would use the async runtime scheduler (like tokio) to figure out, which task needs to be polled again to read the next message from the heap via a waker that is woken. The workflow would be something like:
If a new item gets put on the heap, we check if we have a pending I think one advantage of this approach could be that a task that is waiting for a new message (i.e. an event-loop) will then be able to do more work within a single poll without context switching, i.e. once woken, a message will be popped off and the handler should be invoked immediately until that one returns On a different note, the implementation may be easier to understand but that is a taste thing. I am biased because most of https://github.com/libp2p/rust-libp2p is implemented as manual, poll-based state machines :) |
I had just finished writing out a detailed comment of the differences between the two strategies, and then my power cut out 😢. I'm on my phone now and will summarize. The solution you are proposing differs in that the receiver must wake up, lock the channel, and try take off a message. This contends with other receive operations, and another receiver might have taken the message that the receive future was woken for from the queue already, resulting in a spurious wakeup. So, keeping a slot that is fulfilled reduces spurious wakeups as well as contention on the inner channel. This is the same strategy used by flume. I'm not sure what you mean by doing less work - as far as I understand, it would be doing more work given it has to lock the contended channel as opposed to the uncontended waiting receiver slot. PS: there is a diagram for how the receive currently works in the original PR comment, which shows how the waiting is out of the critical section of the main channel lock |
Interesting, thanks for the detailed response. I guess the wake-up strategy only makes sense when there is a dedicated receiver for an item like a substream in the case of a multiplexer and the item cannot be handled by someone else. |
I believe if each item has a designated receiver at send time, then there will be no spurious wakeups, and probably less contention on the lock around whatever contains that item too, yes. |
Power's back, here's what was saved as draft by GitHub, for posterity:
|
Thanks beta clippy!
I came up with some changes that allow for I am not sure if this is a blocker for this PR, since it is still possible to make a fan out wrapper. Personally, I'd say that it's something that could go in a followup, as it's not really an issue with the channel here but rather that the API of MessageChannel as is was never going to be able to support this. Therefore, from my side I am ready to merge. Let me know what you think @thomaseizinger and maybe we can merge soon! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few doc nits but happy to merge this!
impl<R, F, TResolveMarker> SendFuture<R, F, TResolveMarker> | ||
where | ||
F: SetPriority, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would have probably created two impl blocks instead of a trait but that is more of a style question.
This solution might be slightly more expensive to compile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally planned to use SetPriority to let BroadcastFuture with erased actor type set priority too. I can push that branch but it didn't work out for other reasons that I discussed, so we can move to two impl blocks maybe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can leave this as-is for now since when messagechannels get refactored they could use SetPriority too
This should be solvable if we move away from a trait object and introduce a dedicated type right? Then we can restrict where this type can be constructed with bounds like This might also solve #68. |
Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
That's true. If so, that would be for a followup PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really cool! I have looked at it mostly to understand xtra internals a bit more, I'm looking forward to using these features! :)
Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
🎉 |
Summary
This change unifies the three existing sources of messages (immediate notifications, address messages, and broadcasts) for an actor to handle into one big channel implementation in
inbox.rs
. It additionally upgrades the capabilities of immediate notifications, address messages, and broadcasts with support for assigning priorities to them. An ordered queue of default-priority messages from addresses is still kept to provide FIFO ordering over default-priority messages, as binary heaps do not preserve this order, so the address message priority queue cannot be used.Motivation
Pros
Address
can be pointer sized!futures::select
, ortry_recvs
to be interspersed to prevent starvation.join
andselect
- the difference is night and day!Cons
Conclusion
Personally, I believe that the pros outweigh the cons, so I began to implement this proof of concept.
Channel design
I leaned heavily on my experience working on flume and barrage to create this channel. I borrowed the idea of simply locking the inner channel from flume, since as far as I understand it, channels will always have contention on the head and tail, and whether or not this is resolved through lockless CAS loops or simple locks, this will usually be the limiting factor in speed. Therefore, what is far more important than atomic trickery is just reducing contention on the lock itself. Using a lock really does not produce performance that far off of atomic trickery anyway, and is more than sufficient for this use case. This allows the code to be much simpler to reason about compared to a lockless channel written with the same level of features provided, and it can be in fully safe Rust.
The main channel is locked with a std Mutex, as contention is expected (though, not for that long). One consideration could be to use a fair Mutex here. The individual broadcast channels and other things that are not expecting to be heavily contended are locked with Spinlocks, as this minimises the amount of time needed to unlock when no contention is present.
Diagram of channel operations
An activity diagram of all of the channel operations
Unresolved questions
Use a fair mutex or a parking lot mutex for the inner channel?Can be resolved in the future if there is interest.Is sink impl flexible enough?Can be resolved in the future if there is interest.MessageChannel
not having abroadcast
block this PR or not?TBD
Pop waiters in priority-order
Lifecycle management (drop notice & last strong address handling)
Add some tests from Refactor context #51
Rename StolenMessage and friends to something else
Documentation
TODO(doc)
commentsSink impl
Priority sending API
send_priority
in favour ofsend(...).priority(..)
)Tests