Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Differentiate single and broadcast message on type-system level #166

Merged
merged 27 commits into from
Aug 30, 2022
Merged
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a38a4e2
Split `try_send` internally
thomaseizinger Aug 7, 2022
8cc534e
Inline `Sender::try_send`
thomaseizinger Aug 7, 2022
261320c
Make `WaitingSender` generic over message
thomaseizinger Aug 7, 2022
58ab1db
Remove `MessageType` enum
thomaseizinger Aug 7, 2022
7291b0a
Keep `WaitingSender`s per message type separate
thomaseizinger Aug 7, 2022
113eadf
Qualify `FulFillHandle`
thomaseizinger Aug 8, 2022
6571d09
Move `WaitingSender` to its own module
thomaseizinger Aug 8, 2022
da48e27
Introduce `waiting_sender::FulFillHandle` equivalent to `waiting_rece…
thomaseizinger Aug 8, 2022
215fe86
Call `set_closed` on `Drop` of `waiting_sender::FulFillHandle`
thomaseizinger Aug 8, 2022
84f4c14
Inline `set_closed` function
thomaseizinger Aug 8, 2022
45bf722
Have `fulfill` consume the `FulFillHandle`
thomaseizinger Aug 8, 2022
1bb10ff
Introduce helper functions for `try_fullfil` functions
thomaseizinger Aug 8, 2022
47a9792
Clear inactive handles as part of finding the next one
thomaseizinger Aug 8, 2022
8189334
Use `if let` where possible
thomaseizinger Aug 7, 2022
474cd46
Introduce `TrySend` trait to remove duplicate `Future` impl on `Sending`
thomaseizinger Aug 8, 2022
33e5f26
Remove `SentMessage` and add `TrySend`
thomaseizinger Aug 8, 2022
1ea2faa
Inline `SpinLock` type
thomaseizinger Aug 8, 2022
580aa67
Introduce type aliases
thomaseizinger Aug 8, 2022
2cf1a98
Remove panic and add docs to `FulfillHandle`
thomaseizinger Aug 8, 2022
0d9524d
Rename `FulfillHandle`s to `Handle`
thomaseizinger Aug 8, 2022
5da6890
Change `fulfill` to `take_message` wording for `waiting_sender`
thomaseizinger Aug 8, 2022
c2966db
Fix docs
thomaseizinger Aug 8, 2022
6ea982b
Merge branch 'master' into split-try-send-2
thomaseizinger Aug 10, 2022
95ffe8a
Remove empty line
thomaseizinger Aug 10, 2022
320c210
Remove unnecessary `Unpin` bound
thomaseizinger Aug 15, 2022
e9eb9d3
Remove `TrySend` trait
thomaseizinger Aug 16, 2022
7a9810b
Merge branch 'master' into split-try-send-2
thomaseizinger Aug 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 38 additions & 28 deletions src/inbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,15 +418,9 @@ impl<A> ChanInner<A> {
self.waiting_send_to_one.retain(|handle| handle.is_active());

loop {
let pos =
self.waiting_send_to_one
.iter()
.position(|handle| match handle.priority() {
Some(p) => p == Priority::default(),
None => false,
})?;

if let Some(msg) = self.waiting_send_to_one.remove(pos).unwrap().fulfill() {
if let Some(msg) =
find_remove_next_default_priority(&mut self.waiting_send_to_one)?.fulfill()
{
return Some(msg);
}
}
Expand All @@ -436,17 +430,9 @@ impl<A> ChanInner<A> {
self.waiting_send_to_one.retain(|handle| handle.is_active());

loop {
let pos = self
.waiting_send_to_one
.iter()
.enumerate()
.max_by_key(|(_idx, handle)| match handle.priority() {
Some(p) if p > Priority::default() => Some(p),
_ => None,
})?
.0;

if let Some(msg) = self.waiting_send_to_one.remove(pos).unwrap().fulfill() {
if let Some(msg) =
find_remove_highest_priority(&mut self.waiting_send_to_one)?.fulfill()
{
return Some(msg);
}
}
Expand All @@ -456,14 +442,9 @@ impl<A> ChanInner<A> {
self.waiting_send_to_all.retain(|handle| handle.is_active());

loop {
let pos = self
.waiting_send_to_all
.iter()
.enumerate()
.max_by_key(|(_idx, handle)| handle.priority())?
.0;

if let Some(msg) = self.waiting_send_to_all.remove(pos).unwrap().fulfill() {
if let Some(msg) =
find_remove_highest_priority(&mut self.waiting_send_to_all)?.fulfill()
{
return Some(msg);
}
}
Expand All @@ -485,6 +466,35 @@ impl<A> ChanInner<A> {
}
}

fn find_remove_highest_priority<M>(
Copy link
Owner

@Restioson Restioson Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to: how I mentioned using BinaryHeap for priority messages, this could be removed & replaced with an O(log n) binary heap remove call, at some stage

queue: &mut VecDeque<waiting_sender::FulFillHandle<M>>,
) -> Option<waiting_sender::FulFillHandle<M>>
where
M: HasPriority,
{
let pos = queue
.iter()
.enumerate()
.max_by_key(|(_, handle)| handle.priority())?
.0;

queue.remove(pos)
}

fn find_remove_next_default_priority<M>(
queue: &mut VecDeque<waiting_sender::FulFillHandle<M>>,
) -> Option<waiting_sender::FulFillHandle<M>>
where
M: HasPriority,
{
let pos = queue.iter().position(|handle| match handle.priority() {
None => false,
Some(p) => p == Priority::default(),
})?;

queue.remove(pos)
}

pub enum SentMessage<A> {
ToOneActor(Box<dyn MessageEnvelope<Actor = A>>),
ToAllActors(Arc<dyn BroadcastEnvelope<Actor = A>>),
Expand Down