From a38a4e282105f8ed4171cb36a50079fdd3f60694 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 7 Aug 2022 11:07:08 +0200 Subject: [PATCH 01/25] Split `try_send` internally --- src/inbox.rs | 104 +++++++++++++++++++++++++-------------------------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index 621db9d6..2e3a192b 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -82,9 +82,16 @@ impl Chan { mailbox } - pub fn try_send( + pub fn try_send(&self, message: SentMessage) -> Result>, Error> { + match message { + SentMessage::ToOneActor(to_one) => self.try_send_to_one(to_one), + SentMessage::ToAllActors(to_all) => self.try_send_to_all(to_all), + } + } + + fn try_send_to_one( &self, - mut message: SentMessage, + mut message: Box>, ) -> Result>, Error> { if !self.is_connected() { return Err(Error::Disconnected); @@ -94,45 +101,54 @@ impl Chan { let mut inner = self.chan.lock().unwrap(); - let result = match message { - SentMessage::ToAllActors(m) => { - if inner.is_broadcast_full() { - let waiting = WaitingSender::new(SentMessage::ToAllActors(m)); - inner.waiting_senders.push_back(Arc::downgrade(&waiting)); - - return Ok(Err(MailboxFull(waiting))); - } + let unfulfilled_msg = if let Err(msg) = inner.try_fulfill_receiver(message) { + msg + } else { + return Ok(Ok(())); + }; - inner.send_broadcast(m); - Ok(()) + match unfulfilled_msg { + m if m.priority() == Priority::default() && !inner.is_ordered_full() => { + inner.ordered_queue.push_back(m); + } + m if m.priority() != Priority::default() && !inner.is_priority_full() => { + inner.priority_queue.push(ByPriority(m)); } - SentMessage::ToOneActor(msg) => { - let unfulfilled_msg = if let Err(msg) = inner.try_fulfill_receiver(msg) { - msg - } else { - return Ok(Ok(())); - }; - - match unfulfilled_msg { - m if m.priority() == Priority::default() && !inner.is_ordered_full() => { - inner.ordered_queue.push_back(m); - } - m if m.priority() != Priority::default() && !inner.is_priority_full() => { - inner.priority_queue.push(ByPriority(m)); - } - _ => { - let waiting = WaitingSender::new(SentMessage::ToOneActor(unfulfilled_msg)); - inner.waiting_senders.push_back(Arc::downgrade(&waiting)); - - return Ok(Err(MailboxFull(waiting))); - } - }; - - Ok(()) + _ => { + let waiting = WaitingSender::new(SentMessage::ToOneActor(unfulfilled_msg)); + inner.waiting_senders.push_back(Arc::downgrade(&waiting)); + + return Ok(Err(MailboxFull(waiting))); } }; - Ok(result) + Ok(Ok(())) + } + + fn try_send_to_all( + &self, + mut message: Arc>, + ) -> Result>, Error> { + if !self.is_connected() { + return Err(Error::Disconnected); + } + + Arc::get_mut(&mut message) + .expect("calling after try_send not supported") + .start_span(); + + let mut inner = self.chan.lock().unwrap(); + + if inner.is_broadcast_full() { + let waiting = WaitingSender::new(SentMessage::ToAllActors(message)); + inner.waiting_senders.push_back(Arc::downgrade(&waiting)); + + return Ok(Err(MailboxFull(waiting))); + } + + inner.send_broadcast(message); + + Ok(Ok(())) } fn try_recv( @@ -477,22 +493,6 @@ pub enum SentMessage { ToAllActors(Arc>), } -impl SentMessage { - pub fn start_span(&mut self) { - #[cfg(feature = "instrumentation")] - match self { - SentMessage::ToOneActor(m) => { - m.start_span(); - } - SentMessage::ToAllActors(m) => { - Arc::get_mut(m) - .expect("calling after try_send not supported") - .start_span(); - } - }; - } -} - impl From>> for SentMessage { fn from(msg: Box>) -> Self { SentMessage::ToOneActor(msg) From 8cc534ea3da7f26ab8169f9aed438217e8ec4b03 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 7 Aug 2022 11:09:20 +0200 Subject: [PATCH 02/25] Inline `Sender::try_send` --- src/inbox.rs | 11 ++--------- src/send_future.rs | 17 ++++++++++++----- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index 2e3a192b..4e228588 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -82,14 +82,7 @@ impl Chan { mailbox } - pub fn try_send(&self, message: SentMessage) -> Result>, Error> { - match message { - SentMessage::ToOneActor(to_one) => self.try_send_to_one(to_one), - SentMessage::ToAllActors(to_all) => self.try_send_to_all(to_all), - } - } - - fn try_send_to_one( + pub fn try_send_to_one( &self, mut message: Box>, ) -> Result>, Error> { @@ -125,7 +118,7 @@ impl Chan { Ok(Ok(())) } - fn try_send_to_all( + pub fn try_send_to_all( &self, mut message: Arc>, ) -> Result>, Error> { diff --git a/src/send_future.rs b/src/send_future.rs index d1be9eb8..79a5e145 100644 --- a/src/send_future.rs +++ b/src/send_future.rs @@ -183,12 +183,19 @@ where loop { match mem::replace(this, Sending::Done) { - Sending::New { msg, sender } => match sender.try_send(msg)? { - Ok(()) => return Poll::Ready(Ok(())), - Err(MailboxFull(waiting)) => { - *this = Sending::WaitingToSend(waiting); + Sending::New { msg, sender } => { + let result = match msg { + SentMessage::ToOneActor(to_one) => sender.try_send_to_one(to_one)?, + SentMessage::ToAllActors(to_all) => sender.try_send_to_all(to_all)?, + }; + + match result { + Ok(()) => return Poll::Ready(Ok(())), + Err(MailboxFull(waiting)) => { + *this = Sending::WaitingToSend(waiting); + } } - }, + } Sending::WaitingToSend(waiting) => { let poll = { waiting.lock().poll_unpin(cx) }?; // Scoped separately to drop mutex guard asap. From 261320c153a63db47fda000d2fd0d89434ee2c00 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 7 Aug 2022 21:36:13 +0200 Subject: [PATCH 03/25] Make `WaitingSender` generic over message --- src/inbox.rs | 24 ++++++++++++------------ src/send_future.rs | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index 4e228588..3ba0e0c4 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -301,7 +301,7 @@ impl Chan { struct ChanInner { capacity: Option, ordered_queue: VecDeque>>, - waiting_senders: VecDeque>>>, + waiting_senders: VecDeque>>>>, waiting_receivers_handles: VecDeque>, priority_queue: BinaryHeap>>>, broadcast_queues: Vec>>, @@ -493,7 +493,7 @@ impl From>> for SentMessage { } /// An error returned in case the mailbox of an actor is full. -pub struct MailboxFull(pub Arc>>); +pub struct MailboxFull(pub Arc>>>); pub enum ActorMessage { ToOneActor(Box>), @@ -513,17 +513,14 @@ impl From>> for ActorMessage { } } -pub enum WaitingSender { - Active { - waker: Option, - message: SentMessage, - }, +pub enum WaitingSender { + Active { waker: Option, message: M }, Delivered, Closed, } -impl WaitingSender { - pub fn new(message: SentMessage) -> Arc> { +impl WaitingSender { + pub fn new(message: M) -> Arc> { let sender = WaitingSender::Active { waker: None, message, @@ -531,14 +528,14 @@ impl WaitingSender { Arc::new(Spinlock::new(sender)) } - pub fn peek(&self) -> &SentMessage { + pub fn peek(&self) -> &M { match self { WaitingSender::Active { message, .. } => message, _ => panic!("WaitingSender should have message"), } } - pub fn fulfill(&mut self) -> SentMessage { + pub fn fulfill(&mut self) -> M { match mem::replace(self, Self::Delivered) { WaitingSender::Active { mut waker, message } => { if let Some(waker) = waker.take() { @@ -566,7 +563,10 @@ impl WaitingSender { } } -impl Future for WaitingSender { +impl Future for WaitingSender +where + M: Unpin, +{ type Output = Result<(), Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/send_future.rs b/src/send_future.rs index 79a5e145..fccc93bd 100644 --- a/src/send_future.rs +++ b/src/send_future.rs @@ -168,7 +168,7 @@ enum Sending { msg: SentMessage, sender: inbox::Sender, }, - WaitingToSend(Arc>>), + WaitingToSend(Arc>>>), Done, } From 58ab1db1eca12a296608ca13483f828da5fb2fa6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 7 Aug 2022 21:47:03 +0200 Subject: [PATCH 04/25] Remove `MessageType` enum We achieve this by extracting three new `try_fulfill_sender` functions, one for each message type. This creates some duplication but we will deal with that later. --- src/inbox.rs | 99 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 60 insertions(+), 39 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index 3ba0e0c4..0029d5a2 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -327,7 +327,7 @@ impl ChanInner { .capacity .map_or(false, |cap| cap == self.priority_queue.len()) { - match self.try_fulfill_sender(MessageType::Priority) { + match self.try_fulfill_sender_priority() { Some(SentMessage::ToOneActor(msg)) => self.priority_queue.push(ByPriority(msg)), Some(_) => unreachable!(), None => {} @@ -343,7 +343,7 @@ impl ChanInner { .capacity .map_or(false, |cap| cap == self.ordered_queue.len()) { - match self.try_fulfill_sender(MessageType::Ordered) { + match self.try_fulfill_sender_ordered() { Some(SentMessage::ToOneActor(msg)) => self.ordered_queue.push_back(msg), Some(_) => unreachable!(), None => {} @@ -365,7 +365,7 @@ impl ChanInner { // If len < cap, try fulfill a waiting sender if self.capacity.map_or(false, |cap| self.broadcast_tail < cap) { - match self.try_fulfill_sender(MessageType::Broadcast) { + match self.try_fulfill_sender_broadcast() { Some(SentMessage::ToAllActors(m)) => self.send_broadcast(m), Some(_) => unreachable!(), None => {} @@ -418,39 +418,67 @@ impl ChanInner { Err(msg) } - fn try_fulfill_sender(&mut self, for_type: MessageType) -> Option> { + fn try_fulfill_sender_ordered(&mut self) -> Option> { self.waiting_senders .retain(|tx| Weak::strong_count(tx) != 0); loop { - let pos = if for_type == MessageType::Ordered { - self.waiting_senders - .iter() - .position(|tx| match tx.upgrade() { - Some(tx) => matches!(tx.lock().peek(), SentMessage::ToOneActor(m) if m.priority() == Priority::default()), - None => false, - })? - } else { - self.waiting_senders - .iter() - .enumerate() - .max_by_key(|(_idx, tx)| match tx.upgrade() { - Some(tx) => match tx.lock().peek() { - SentMessage::ToOneActor(m) - if for_type == MessageType::Priority - && m.priority() > Priority::default() => - { - Some(m.priority()) - } - SentMessage::ToAllActors(m) if for_type == MessageType::Broadcast => { - Some(m.priority()) - } - _ => None, - }, - None => None, - })? - .0 - }; + let pos = self.waiting_senders + .iter() + .position(|tx| match tx.upgrade() { + Some(tx) => matches!(tx.lock().peek(), SentMessage::ToOneActor(m) if m.priority() == Priority::default()), + None => false, + })?; + + if let Some(tx) = self.waiting_senders.remove(pos).unwrap().upgrade() { + return Some(tx.lock().fulfill()); + } + } + } + + fn try_fulfill_sender_priority(&mut self) -> Option> { + self.waiting_senders + .retain(|tx| Weak::strong_count(tx) != 0); + + loop { + let pos = self + .waiting_senders + .iter() + .enumerate() + .max_by_key(|(_idx, tx)| match tx.upgrade() { + Some(tx) => match tx.lock().peek() { + SentMessage::ToOneActor(m) if m.priority() > Priority::default() => { + Some(m.priority()) + } + _ => None, + }, + None => None, + })? + .0; + + if let Some(tx) = self.waiting_senders.remove(pos).unwrap().upgrade() { + return Some(tx.lock().fulfill()); + } + } + } + + fn try_fulfill_sender_broadcast(&mut self) -> Option> { + self.waiting_senders + .retain(|tx| Weak::strong_count(tx) != 0); + + loop { + let pos = self + .waiting_senders + .iter() + .enumerate() + .max_by_key(|(_idx, tx)| match tx.upgrade() { + Some(tx) => match tx.lock().peek() { + SentMessage::ToAllActors(m) => Some(m.priority()), + _ => None, + }, + None => None, + })? + .0; if let Some(tx) = self.waiting_senders.remove(pos).unwrap().upgrade() { return Some(tx.lock().fulfill()); @@ -474,13 +502,6 @@ impl ChanInner { } } -#[derive(Eq, PartialEq)] -enum MessageType { - Broadcast, - Ordered, - Priority, -} - pub enum SentMessage { ToOneActor(Box>), ToAllActors(Arc>), From 7291b0a9b183237e365296817e37715296935e65 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 7 Aug 2022 21:56:55 +0200 Subject: [PATCH 05/25] Keep `WaitingSender`s per message type separate This allows us to be more specific about which message types we deal with in particular functions. --- src/inbox.rs | 89 +++++++++++++++++++++++++--------------------- src/send_future.rs | 43 +++++++++++++++------- 2 files changed, 79 insertions(+), 53 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index 0029d5a2..238269a3 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -85,7 +85,7 @@ impl Chan { pub fn try_send_to_one( &self, mut message: Box>, - ) -> Result>, Error> { + ) -> Result>>>, Error> { if !self.is_connected() { return Err(Error::Disconnected); } @@ -108,8 +108,10 @@ impl Chan { inner.priority_queue.push(ByPriority(m)); } _ => { - let waiting = WaitingSender::new(SentMessage::ToOneActor(unfulfilled_msg)); - inner.waiting_senders.push_back(Arc::downgrade(&waiting)); + let waiting = WaitingSender::new(unfulfilled_msg); + inner + .waiting_send_to_one + .push_back(Arc::downgrade(&waiting)); return Ok(Err(MailboxFull(waiting))); } @@ -121,7 +123,7 @@ impl Chan { pub fn try_send_to_all( &self, mut message: Arc>, - ) -> Result>, Error> { + ) -> Result>>>, Error> { if !self.is_connected() { return Err(Error::Disconnected); } @@ -133,8 +135,10 @@ impl Chan { let mut inner = self.chan.lock().unwrap(); if inner.is_broadcast_full() { - let waiting = WaitingSender::new(SentMessage::ToAllActors(message)); - inner.waiting_senders.push_back(Arc::downgrade(&waiting)); + let waiting = WaitingSender::new(message); + inner + .waiting_send_to_all + .push_back(Arc::downgrade(&waiting)); return Ok(Err(MailboxFull(waiting))); } @@ -235,7 +239,7 @@ impl Chan { /// Shutdown all [`WaitingSender`]s in this channel. fn shutdown_waiting_senders(&self) { - let waiting_tx = { + let (waiting_send_to_one, waiting_send_to_all) = { let mut inner = match self.chan.lock() { Ok(lock) => lock, Err(_) => return, // Poisoned, ignore @@ -248,10 +252,17 @@ impl Chan { inner.priority_queue.clear(); inner.broadcast_queues.clear(); - mem::take(&mut inner.waiting_senders) + ( + mem::take(&mut inner.waiting_send_to_one), + mem::take(&mut inner.waiting_send_to_all), + ) }; - for tx in waiting_tx.into_iter().flat_map(|w| w.upgrade()) { + for tx in waiting_send_to_one.into_iter().flat_map(|w| w.upgrade()) { + tx.lock().set_closed(); + } + + for tx in waiting_send_to_all.into_iter().flat_map(|w| w.upgrade()) { tx.lock().set_closed(); } } @@ -301,7 +312,10 @@ impl Chan { struct ChanInner { capacity: Option, ordered_queue: VecDeque>>, - waiting_senders: VecDeque>>>>, + waiting_send_to_one: + VecDeque>>>>>, + waiting_send_to_all: + VecDeque>>>>>, waiting_receivers_handles: VecDeque>, priority_queue: BinaryHeap>>>, broadcast_queues: Vec>>, @@ -313,7 +327,8 @@ impl ChanInner { Self { capacity, ordered_queue: VecDeque::default(), - waiting_senders: VecDeque::default(), + waiting_send_to_one: VecDeque::default(), + waiting_send_to_all: VecDeque::default(), waiting_receivers_handles: VecDeque::default(), priority_queue: BinaryHeap::default(), broadcast_queues: Vec::default(), @@ -328,8 +343,7 @@ impl ChanInner { .map_or(false, |cap| cap == self.priority_queue.len()) { match self.try_fulfill_sender_priority() { - Some(SentMessage::ToOneActor(msg)) => self.priority_queue.push(ByPriority(msg)), - Some(_) => unreachable!(), + Some(msg) => self.priority_queue.push(ByPriority(msg)), None => {} } } @@ -344,8 +358,7 @@ impl ChanInner { .map_or(false, |cap| cap == self.ordered_queue.len()) { match self.try_fulfill_sender_ordered() { - Some(SentMessage::ToOneActor(msg)) => self.ordered_queue.push_back(msg), - Some(_) => unreachable!(), + Some(msg) => self.ordered_queue.push_back(msg), None => {} } } @@ -366,8 +379,7 @@ impl ChanInner { // If len < cap, try fulfill a waiting sender if self.capacity.map_or(false, |cap| self.broadcast_tail < cap) { match self.try_fulfill_sender_broadcast() { - Some(SentMessage::ToAllActors(m)) => self.send_broadcast(m), - Some(_) => unreachable!(), + Some(m) => self.send_broadcast(m), None => {} } } @@ -418,69 +430,64 @@ impl ChanInner { Err(msg) } - fn try_fulfill_sender_ordered(&mut self) -> Option> { - self.waiting_senders + fn try_fulfill_sender_ordered(&mut self) -> Option>> { + self.waiting_send_to_one .retain(|tx| Weak::strong_count(tx) != 0); loop { - let pos = self.waiting_senders + let pos = self + .waiting_send_to_one .iter() .position(|tx| match tx.upgrade() { - Some(tx) => matches!(tx.lock().peek(), SentMessage::ToOneActor(m) if m.priority() == Priority::default()), + Some(tx) => { + matches!(tx.lock().peek(), m if m.priority() == Priority::default()) + } None => false, })?; - if let Some(tx) = self.waiting_senders.remove(pos).unwrap().upgrade() { + if let Some(tx) = self.waiting_send_to_one.remove(pos).unwrap().upgrade() { return Some(tx.lock().fulfill()); } } } - fn try_fulfill_sender_priority(&mut self) -> Option> { - self.waiting_senders + fn try_fulfill_sender_priority(&mut self) -> Option>> { + self.waiting_send_to_one .retain(|tx| Weak::strong_count(tx) != 0); loop { let pos = self - .waiting_senders + .waiting_send_to_one .iter() .enumerate() .max_by_key(|(_idx, tx)| match tx.upgrade() { Some(tx) => match tx.lock().peek() { - SentMessage::ToOneActor(m) if m.priority() > Priority::default() => { - Some(m.priority()) - } + m if m.priority() > Priority::default() => Some(m.priority()), _ => None, }, None => None, })? .0; - if let Some(tx) = self.waiting_senders.remove(pos).unwrap().upgrade() { + if let Some(tx) = self.waiting_send_to_one.remove(pos).unwrap().upgrade() { return Some(tx.lock().fulfill()); } } } - fn try_fulfill_sender_broadcast(&mut self) -> Option> { - self.waiting_senders + fn try_fulfill_sender_broadcast(&mut self) -> Option>> { + self.waiting_send_to_all .retain(|tx| Weak::strong_count(tx) != 0); loop { let pos = self - .waiting_senders + .waiting_send_to_all .iter() .enumerate() - .max_by_key(|(_idx, tx)| match tx.upgrade() { - Some(tx) => match tx.lock().peek() { - SentMessage::ToAllActors(m) => Some(m.priority()), - _ => None, - }, - None => None, - })? + .max_by_key(|(_idx, tx)| tx.upgrade().map(|tx| tx.lock().peek().priority()))? .0; - if let Some(tx) = self.waiting_senders.remove(pos).unwrap().upgrade() { + if let Some(tx) = self.waiting_send_to_all.remove(pos).unwrap().upgrade() { return Some(tx.lock().fulfill()); } } @@ -514,7 +521,7 @@ impl From>> for SentMessage { } /// An error returned in case the mailbox of an actor is full. -pub struct MailboxFull(pub Arc>>>); +pub struct MailboxFull(pub Arc>>); pub enum ActorMessage { ToOneActor(Box>), diff --git a/src/send_future.rs b/src/send_future.rs index fccc93bd..e8d0990a 100644 --- a/src/send_future.rs +++ b/src/send_future.rs @@ -7,7 +7,9 @@ use std::task::{Context, Poll}; use futures_core::FusedFuture; use futures_util::FutureExt; -use crate::envelope::{BroadcastEnvelopeConcrete, ReturningEnvelope}; +use crate::envelope::{ + BroadcastEnvelope, BroadcastEnvelopeConcrete, MessageEnvelope, ReturningEnvelope, +}; use crate::inbox::{MailboxFull, SentMessage, WaitingSender}; use crate::refcount::RefCounter; use crate::{inbox, Error, Handler}; @@ -168,7 +170,8 @@ enum Sending { msg: SentMessage, sender: inbox::Sender, }, - WaitingToSend(Arc>>>), + WaitingToSendOne(Arc>>>>), + WaitingToSendAll(Arc>>>>), Done, } @@ -184,25 +187,41 @@ where loop { match mem::replace(this, Sending::Done) { Sending::New { msg, sender } => { - let result = match msg { - SentMessage::ToOneActor(to_one) => sender.try_send_to_one(to_one)?, - SentMessage::ToAllActors(to_all) => sender.try_send_to_all(to_all)?, + match msg { + SentMessage::ToOneActor(to_one) => match sender.try_send_to_one(to_one)? { + Ok(()) => return Poll::Ready(Ok(())), + Err(MailboxFull(waiting)) => { + *this = Sending::WaitingToSendOne(waiting); + } + }, + SentMessage::ToAllActors(to_all) => { + match sender.try_send_to_all(to_all)? { + Ok(()) => return Poll::Ready(Ok(())), + Err(MailboxFull(waiting)) => { + *this = Sending::WaitingToSendAll(waiting); + } + } + } }; + } + Sending::WaitingToSendOne(waiting) => { + let poll = { waiting.lock().poll_unpin(cx) }?; // Scoped separately to drop mutex guard asap. - match result { - Ok(()) => return Poll::Ready(Ok(())), - Err(MailboxFull(waiting)) => { - *this = Sending::WaitingToSend(waiting); + return match poll { + Poll::Ready(()) => Poll::Ready(Ok(())), + Poll::Pending => { + *this = Sending::WaitingToSendOne(waiting); + Poll::Pending } - } + }; } - Sending::WaitingToSend(waiting) => { + Sending::WaitingToSendAll(waiting) => { let poll = { waiting.lock().poll_unpin(cx) }?; // Scoped separately to drop mutex guard asap. return match poll { Poll::Ready(()) => Poll::Ready(Ok(())), Poll::Pending => { - *this = Sending::WaitingToSend(waiting); + *this = Sending::WaitingToSendAll(waiting); Poll::Pending } }; From 113eadf9684f8fc0036ee53dd7db85e20e5538cb Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 12:11:43 +0200 Subject: [PATCH 06/25] Qualify `FulFillHandle` --- src/inbox.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index 238269a3..a1d91044 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -20,7 +20,7 @@ pub use tx::Sender; use crate::envelope::{BroadcastEnvelope, MessageEnvelope, Shutdown}; use crate::inbox::tx::TxStrong; -use crate::inbox::waiting_receiver::{FulfillHandle, WaitingReceiver}; +use crate::inbox::waiting_receiver::WaitingReceiver; use crate::{Actor, Error}; type Spinlock = spin::Mutex; @@ -316,7 +316,7 @@ struct ChanInner { VecDeque>>>>>, waiting_send_to_all: VecDeque>>>>>, - waiting_receivers_handles: VecDeque>, + waiting_receivers_handles: VecDeque>, priority_queue: BinaryHeap>>>, broadcast_queues: Vec>>, broadcast_tail: usize, From 6571d09d042d6b9eac80cd6a3a41a8eca791e7a7 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 12:14:26 +0200 Subject: [PATCH 07/25] Move `WaitingSender` to its own module --- src/inbox.rs | 75 +----------------------------------- src/inbox/waiting_sender.rs | 77 +++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 73 deletions(-) create mode 100644 src/inbox/waiting_sender.rs diff --git a/src/inbox.rs b/src/inbox.rs index a1d91044..c7fa93bd 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -4,19 +4,18 @@ pub mod rx; pub mod tx; mod waiting_receiver; +mod waiting_sender; use std::cmp::Ordering; use std::collections::{BinaryHeap, VecDeque}; -use std::future::Future; -use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::{atomic, Arc, Mutex, Weak}; -use std::task::{Context, Poll, Waker}; use std::{cmp, mem}; use event_listener::{Event, EventListener}; pub use rx::Receiver; pub use tx::Sender; +pub use waiting_sender::WaitingSender; use crate::envelope::{BroadcastEnvelope, MessageEnvelope, Shutdown}; use crate::inbox::tx::TxStrong; @@ -541,76 +540,6 @@ impl From>> for ActorMessage { } } -pub enum WaitingSender { - Active { waker: Option, message: M }, - Delivered, - Closed, -} - -impl WaitingSender { - pub fn new(message: M) -> Arc> { - let sender = WaitingSender::Active { - waker: None, - message, - }; - Arc::new(Spinlock::new(sender)) - } - - pub fn peek(&self) -> &M { - match self { - WaitingSender::Active { message, .. } => message, - _ => panic!("WaitingSender should have message"), - } - } - - pub fn fulfill(&mut self) -> M { - match mem::replace(self, Self::Delivered) { - WaitingSender::Active { mut waker, message } => { - if let Some(waker) = waker.take() { - waker.wake(); - } - - message - } - WaitingSender::Delivered | WaitingSender::Closed => { - panic!("WaitingSender is already fulfilled or closed") - } - } - } - - /// Mark this [`WaitingSender`] as closed. - /// - /// Should be called when the last [`Receiver`](crate::inbox::Receiver) goes away. - pub fn set_closed(&mut self) { - if let WaitingSender::Active { - waker: Some(waker), .. - } = mem::replace(self, Self::Closed) - { - waker.wake(); - } - } -} - -impl Future for WaitingSender -where - M: Unpin, -{ - type Output = Result<(), Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - match this { - WaitingSender::Active { waker, .. } => { - *waker = Some(cx.waker().clone()); - Poll::Pending - } - WaitingSender::Delivered => Poll::Ready(Ok(())), - WaitingSender::Closed => Poll::Ready(Err(Error::Disconnected)), - } - } -} - #[derive(Eq, PartialEq, Ord, PartialOrd, Copy, Clone)] pub enum Priority { Valued(u32), diff --git a/src/inbox/waiting_sender.rs b/src/inbox/waiting_sender.rs new file mode 100644 index 00000000..12c8ce3d --- /dev/null +++ b/src/inbox/waiting_sender.rs @@ -0,0 +1,77 @@ +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll, Waker}; + +use crate::Error; + +pub enum WaitingSender { + Active { waker: Option, message: M }, + Delivered, + Closed, +} + +impl WaitingSender { + pub fn new(message: M) -> Arc> { + let sender = WaitingSender::Active { + waker: None, + message, + }; + Arc::new(spin::Mutex::new(sender)) + } + + pub fn peek(&self) -> &M { + match self { + WaitingSender::Active { message, .. } => message, + _ => panic!("WaitingSender should have message"), + } + } + + pub fn fulfill(&mut self) -> M { + match mem::replace(self, Self::Delivered) { + WaitingSender::Active { mut waker, message } => { + if let Some(waker) = waker.take() { + waker.wake(); + } + + message + } + WaitingSender::Delivered | WaitingSender::Closed => { + panic!("WaitingSender is already fulfilled or closed") + } + } + } + + /// Mark this [`WaitingSender`] as closed. + /// + /// Should be called when the last [`Receiver`](crate::inbox::Receiver) goes away. + pub fn set_closed(&mut self) { + if let WaitingSender::Active { + waker: Some(waker), .. + } = mem::replace(self, Self::Closed) + { + waker.wake(); + } + } +} + +impl Future for WaitingSender +where + A: Unpin, +{ + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + match this { + WaitingSender::Active { waker, .. } => { + *waker = Some(cx.waker().clone()); + Poll::Pending + } + WaitingSender::Delivered => Poll::Ready(Ok(())), + WaitingSender::Closed => Poll::Ready(Err(Error::Disconnected)), + } + } +} From da48e277bb9686f26109a48bdf4edb71872e2f6a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 12:57:53 +0200 Subject: [PATCH 08/25] Introduce `waiting_sender::FulFillHandle` equivalent to `waiting_receiver` --- src/inbox.rs | 76 ++++++++++++---------------- src/inbox/waiting_sender.rs | 99 ++++++++++++++++++++++++------------- src/send_future.rs | 16 +++--- 3 files changed, 104 insertions(+), 87 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index c7fa93bd..046f05dc 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -107,10 +107,8 @@ impl Chan { inner.priority_queue.push(ByPriority(m)); } _ => { - let waiting = WaitingSender::new(unfulfilled_msg); - inner - .waiting_send_to_one - .push_back(Arc::downgrade(&waiting)); + let (handle, waiting) = WaitingSender::new(unfulfilled_msg); + inner.waiting_send_to_one.push_back(handle); return Ok(Err(MailboxFull(waiting))); } @@ -134,10 +132,8 @@ impl Chan { let mut inner = self.chan.lock().unwrap(); if inner.is_broadcast_full() { - let waiting = WaitingSender::new(message); - inner - .waiting_send_to_all - .push_back(Arc::downgrade(&waiting)); + let (handle, waiting) = WaitingSender::new(message); + inner.waiting_send_to_all.push_back(handle); return Ok(Err(MailboxFull(waiting))); } @@ -257,12 +253,12 @@ impl Chan { ) }; - for tx in waiting_send_to_one.into_iter().flat_map(|w| w.upgrade()) { - tx.lock().set_closed(); + for tx in waiting_send_to_one { + tx.set_closed(); } - for tx in waiting_send_to_all.into_iter().flat_map(|w| w.upgrade()) { - tx.lock().set_closed(); + for tx in waiting_send_to_all { + tx.set_closed(); } } @@ -312,9 +308,9 @@ struct ChanInner { capacity: Option, ordered_queue: VecDeque>>, waiting_send_to_one: - VecDeque>>>>>, + VecDeque>>>, waiting_send_to_all: - VecDeque>>>>>, + VecDeque>>>, waiting_receivers_handles: VecDeque>, priority_queue: BinaryHeap>>>, broadcast_queues: Vec>>, @@ -430,64 +426,56 @@ impl ChanInner { } fn try_fulfill_sender_ordered(&mut self) -> Option>> { - self.waiting_send_to_one - .retain(|tx| Weak::strong_count(tx) != 0); + self.waiting_send_to_one.retain(|handle| handle.is_active()); loop { - let pos = self - .waiting_send_to_one - .iter() - .position(|tx| match tx.upgrade() { - Some(tx) => { - matches!(tx.lock().peek(), m if m.priority() == Priority::default()) - } - None => false, - })?; - - if let Some(tx) = self.waiting_send_to_one.remove(pos).unwrap().upgrade() { - return Some(tx.lock().fulfill()); + let pos = + self.waiting_send_to_one + .iter() + .position(|handle| match handle.priority() { + Some(p) => p == Priority::default(), + None => false, + })?; + + if let Some(msg) = self.waiting_send_to_one.remove(pos).unwrap().fulfill() { + return Some(msg); } } } fn try_fulfill_sender_priority(&mut self) -> Option>> { - self.waiting_send_to_one - .retain(|tx| Weak::strong_count(tx) != 0); + self.waiting_send_to_one.retain(|handle| handle.is_active()); loop { let pos = self .waiting_send_to_one .iter() .enumerate() - .max_by_key(|(_idx, tx)| match tx.upgrade() { - Some(tx) => match tx.lock().peek() { - m if m.priority() > Priority::default() => Some(m.priority()), - _ => None, - }, - None => None, + .max_by_key(|(_idx, handle)| match handle.priority() { + Some(p) if p > Priority::default() => Some(p), + _ => None, })? .0; - if let Some(tx) = self.waiting_send_to_one.remove(pos).unwrap().upgrade() { - return Some(tx.lock().fulfill()); + if let Some(msg) = self.waiting_send_to_one.remove(pos).unwrap().fulfill() { + return Some(msg); } } } fn try_fulfill_sender_broadcast(&mut self) -> Option>> { - self.waiting_send_to_all - .retain(|tx| Weak::strong_count(tx) != 0); + self.waiting_send_to_all.retain(|handle| handle.is_active()); loop { let pos = self .waiting_send_to_all .iter() .enumerate() - .max_by_key(|(_idx, tx)| tx.upgrade().map(|tx| tx.lock().peek().priority()))? + .max_by_key(|(_idx, handle)| handle.priority())? .0; - if let Some(tx) = self.waiting_send_to_all.remove(pos).unwrap().upgrade() { - return Some(tx.lock().fulfill()); + if let Some(msg) = self.waiting_send_to_all.remove(pos).unwrap().fulfill() { + return Some(msg); } } } @@ -520,7 +508,7 @@ impl From>> for SentMessage { } /// An error returned in case the mailbox of an actor is full. -pub struct MailboxFull(pub Arc>>); +pub struct MailboxFull(pub WaitingSender); pub enum ActorMessage { ToOneActor(Box>), diff --git a/src/inbox/waiting_sender.rs b/src/inbox/waiting_sender.rs index 12c8ce3d..c2e85e69 100644 --- a/src/inbox/waiting_sender.rs +++ b/src/inbox/waiting_sender.rs @@ -1,57 +1,75 @@ use std::future::Future; use std::mem; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::task::{Context, Poll, Waker}; +use crate::inbox::{HasPriority, Priority}; use crate::Error; -pub enum WaitingSender { - Active { waker: Option, message: M }, - Delivered, - Closed, -} +pub struct WaitingSender(Arc>>); + +pub struct FulFillHandle(Weak>>); impl WaitingSender { - pub fn new(message: M) -> Arc> { - let sender = WaitingSender::Active { - waker: None, - message, - }; - Arc::new(spin::Mutex::new(sender)) + pub fn new(msg: M) -> (FulFillHandle, WaitingSender) { + let inner = Arc::new(spin::Mutex::new(Inner::new(msg))); + + (FulFillHandle(Arc::downgrade(&inner)), WaitingSender(inner)) } +} - pub fn peek(&self) -> &M { - match self { - WaitingSender::Active { message, .. } => message, - _ => panic!("WaitingSender should have message"), - } +impl FulFillHandle { + pub fn is_active(&self) -> bool { + Weak::strong_count(&self.0) > 0 } - pub fn fulfill(&mut self) -> M { - match mem::replace(self, Self::Delivered) { - WaitingSender::Active { mut waker, message } => { + pub fn fulfill(&self) -> Option { + let inner = self.0.upgrade()?; + let mut this = inner.lock(); + + Some(match mem::replace(&mut *this, Inner::Delivered) { + Inner::Active { mut waker, message } => { if let Some(waker) = waker.take() { waker.wake(); } message } - WaitingSender::Delivered | WaitingSender::Closed => { + Inner::Delivered | Inner::Closed => { panic!("WaitingSender is already fulfilled or closed") } - } + }) } /// Mark this [`WaitingSender`] as closed. /// /// Should be called when the last [`Receiver`](crate::inbox::Receiver) goes away. - pub fn set_closed(&mut self) { - if let WaitingSender::Active { - waker: Some(waker), .. - } = mem::replace(self, Self::Closed) - { - waker.wake(); + pub fn set_closed(&self) { + if let Some(inner) = self.0.upgrade() { + let mut this = inner.lock(); + + if let Inner::Active { + waker: Some(waker), .. + } = mem::replace(&mut *this, Inner::Closed) + { + waker.wake(); + } + } + } +} + +impl FulFillHandle +where + M: HasPriority, +{ + pub fn priority(&self) -> Option { + let inner = self.0.upgrade()?; + let this = inner.lock(); + + match &*this { + Inner::Active { message, .. } => Some(message.priority()), + Inner::Closed | Inner::Delivered => None, } } } @@ -63,15 +81,30 @@ where type Output = Result<(), Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let mut this = self.get_mut().0.lock(); - match this { - WaitingSender::Active { waker, .. } => { + match &mut *this { + Inner::Active { waker, .. } => { *waker = Some(cx.waker().clone()); Poll::Pending } - WaitingSender::Delivered => Poll::Ready(Ok(())), - WaitingSender::Closed => Poll::Ready(Err(Error::Disconnected)), + Inner::Delivered => Poll::Ready(Ok(())), + Inner::Closed => Poll::Ready(Err(Error::Disconnected)), + } + } +} + +enum Inner { + Active { waker: Option, message: M }, + Delivered, + Closed, +} + +impl Inner { + fn new(message: M) -> Self { + Inner::Active { + waker: None, + message, } } } diff --git a/src/send_future.rs b/src/send_future.rs index e8d0990a..035b099a 100644 --- a/src/send_future.rs +++ b/src/send_future.rs @@ -170,8 +170,8 @@ enum Sending { msg: SentMessage, sender: inbox::Sender, }, - WaitingToSendOne(Arc>>>>), - WaitingToSendAll(Arc>>>>), + WaitingToSendOne(WaitingSender>>), + WaitingToSendAll(WaitingSender>>), Done, } @@ -204,10 +204,8 @@ where } }; } - Sending::WaitingToSendOne(waiting) => { - let poll = { waiting.lock().poll_unpin(cx) }?; // Scoped separately to drop mutex guard asap. - - return match poll { + Sending::WaitingToSendOne(mut waiting) => { + return match waiting.poll_unpin(cx)? { Poll::Ready(()) => Poll::Ready(Ok(())), Poll::Pending => { *this = Sending::WaitingToSendOne(waiting); @@ -215,10 +213,8 @@ where } }; } - Sending::WaitingToSendAll(waiting) => { - let poll = { waiting.lock().poll_unpin(cx) }?; // Scoped separately to drop mutex guard asap. - - return match poll { + Sending::WaitingToSendAll(mut waiting) => { + return match waiting.poll_unpin(cx)? { Poll::Ready(()) => Poll::Ready(Ok(())), Poll::Pending => { *this = Sending::WaitingToSendAll(waiting); From 215fe860850948abd8d5e48bedabb224100f0e6c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 13:14:37 +0200 Subject: [PATCH 09/25] Call `set_closed` on `Drop` of `waiting_sender::FulFillHandle` This allows us to use `.clear` when shutting down all waiting senders but in order for this to not break anything, we need to re-write the `set_closed` function to only set the state to `Closed` when we are still `Active`. Setting it to `Cloesd` on `Delivered` breaks things. --- src/inbox.rs | 35 ++++++++++++----------------------- src/inbox/waiting_sender.rs | 22 ++++++++++++++++------ 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index 046f05dc..42acabdf 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -234,32 +234,21 @@ impl Chan { /// Shutdown all [`WaitingSender`]s in this channel. fn shutdown_waiting_senders(&self) { - let (waiting_send_to_one, waiting_send_to_all) = { - let mut inner = match self.chan.lock() { - Ok(lock) => lock, - Err(_) => return, // Poisoned, ignore - }; - - self.on_shutdown.notify(usize::MAX); - - // Let any outstanding messages drop - inner.ordered_queue.clear(); - inner.priority_queue.clear(); - inner.broadcast_queues.clear(); - - ( - mem::take(&mut inner.waiting_send_to_one), - mem::take(&mut inner.waiting_send_to_all), - ) + let mut inner = match self.chan.lock() { + Ok(lock) => lock, + Err(_) => return, // Poisoned, ignore }; - for tx in waiting_send_to_one { - tx.set_closed(); - } + self.on_shutdown.notify(usize::MAX); - for tx in waiting_send_to_all { - tx.set_closed(); - } + // Let any outstanding messages drop + inner.ordered_queue.clear(); + inner.priority_queue.clear(); + inner.broadcast_queues.clear(); + + // Close (and potentially wake) outstanding waiting senders + inner.waiting_send_to_one.clear(); + inner.waiting_send_to_all.clear(); } pub fn disconnect_listener(&self) -> Option { diff --git a/src/inbox/waiting_sender.rs b/src/inbox/waiting_sender.rs index c2e85e69..7c9fc9ad 100644 --- a/src/inbox/waiting_sender.rs +++ b/src/inbox/waiting_sender.rs @@ -45,20 +45,30 @@ impl FulFillHandle { /// Mark this [`WaitingSender`] as closed. /// /// Should be called when the last [`Receiver`](crate::inbox::Receiver) goes away. - pub fn set_closed(&self) { + fn set_closed(&self) { if let Some(inner) = self.0.upgrade() { let mut this = inner.lock(); - if let Inner::Active { - waker: Some(waker), .. - } = mem::replace(&mut *this, Inner::Closed) - { - waker.wake(); + match &*this { + Inner::Active { waker, .. } => { + if let Some(waker) = waker { + waker.wake_by_ref(); + } + *this = Inner::Closed; + } + Inner::Delivered => {} + Inner::Closed => {} } } } } +impl Drop for FulFillHandle { + fn drop(&mut self) { + self.set_closed(); + } +} + impl FulFillHandle where M: HasPriority, From 84f4c149dfa37763c603fe7b4bc33a98b0fe1edd Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 13:16:04 +0200 Subject: [PATCH 10/25] Inline `set_closed` function --- src/inbox/waiting_sender.rs | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/src/inbox/waiting_sender.rs b/src/inbox/waiting_sender.rs index 7c9fc9ad..8557aa98 100644 --- a/src/inbox/waiting_sender.rs +++ b/src/inbox/waiting_sender.rs @@ -41,11 +41,10 @@ impl FulFillHandle { } }) } +} - /// Mark this [`WaitingSender`] as closed. - /// - /// Should be called when the last [`Receiver`](crate::inbox::Receiver) goes away. - fn set_closed(&self) { +impl Drop for FulFillHandle { + fn drop(&mut self) { if let Some(inner) = self.0.upgrade() { let mut this = inner.lock(); @@ -59,13 +58,7 @@ impl FulFillHandle { Inner::Delivered => {} Inner::Closed => {} } - } - } -} - -impl Drop for FulFillHandle { - fn drop(&mut self) { - self.set_closed(); + }; } } From 45bf722b185430e8c6f615e790c15e6311488e30 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 13:17:49 +0200 Subject: [PATCH 11/25] Have `fulfill` consume the `FulFillHandle` Fulfilling is only possible once, let's express this with the type system. --- src/inbox/waiting_sender.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/inbox/waiting_sender.rs b/src/inbox/waiting_sender.rs index 8557aa98..bcab76bf 100644 --- a/src/inbox/waiting_sender.rs +++ b/src/inbox/waiting_sender.rs @@ -24,7 +24,7 @@ impl FulFillHandle { Weak::strong_count(&self.0) > 0 } - pub fn fulfill(&self) -> Option { + pub fn fulfill(self) -> Option { let inner = self.0.upgrade()?; let mut this = inner.lock(); From 1bb10ffb3c00ec3ed0b161c5fbb82150922fcc27 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 13:35:40 +0200 Subject: [PATCH 12/25] Introduce helper functions for `try_fullfil` functions --- src/inbox.rs | 66 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index 42acabdf..9dba8cee 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -418,15 +418,9 @@ impl ChanInner { self.waiting_send_to_one.retain(|handle| handle.is_active()); loop { - let pos = - self.waiting_send_to_one - .iter() - .position(|handle| match handle.priority() { - Some(p) => p == Priority::default(), - None => false, - })?; - - if let Some(msg) = self.waiting_send_to_one.remove(pos).unwrap().fulfill() { + if let Some(msg) = + find_remove_next_default_priority(&mut self.waiting_send_to_one)?.fulfill() + { return Some(msg); } } @@ -436,17 +430,9 @@ impl ChanInner { self.waiting_send_to_one.retain(|handle| handle.is_active()); loop { - let pos = self - .waiting_send_to_one - .iter() - .enumerate() - .max_by_key(|(_idx, handle)| match handle.priority() { - Some(p) if p > Priority::default() => Some(p), - _ => None, - })? - .0; - - if let Some(msg) = self.waiting_send_to_one.remove(pos).unwrap().fulfill() { + if let Some(msg) = + find_remove_highest_priority(&mut self.waiting_send_to_one)?.fulfill() + { return Some(msg); } } @@ -456,14 +442,9 @@ impl ChanInner { self.waiting_send_to_all.retain(|handle| handle.is_active()); loop { - let pos = self - .waiting_send_to_all - .iter() - .enumerate() - .max_by_key(|(_idx, handle)| handle.priority())? - .0; - - if let Some(msg) = self.waiting_send_to_all.remove(pos).unwrap().fulfill() { + if let Some(msg) = + find_remove_highest_priority(&mut self.waiting_send_to_all)?.fulfill() + { return Some(msg); } } @@ -485,6 +466,35 @@ impl ChanInner { } } +fn find_remove_highest_priority( + queue: &mut VecDeque>, +) -> Option> +where + M: HasPriority, +{ + let pos = queue + .iter() + .enumerate() + .max_by_key(|(_, handle)| handle.priority())? + .0; + + queue.remove(pos) +} + +fn find_remove_next_default_priority( + queue: &mut VecDeque>, +) -> Option> +where + M: HasPriority, +{ + let pos = queue.iter().position(|handle| match handle.priority() { + None => false, + Some(p) => p == Priority::default(), + })?; + + queue.remove(pos) +} + pub enum SentMessage { ToOneActor(Box>), ToAllActors(Arc>), From 47a97920b6a9ac68a68a5415cfa96aa12d5b2dc2 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 13:37:44 +0200 Subject: [PATCH 13/25] Clear inactive handles as part of finding the next one --- src/inbox.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index 9dba8cee..db31dcdb 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -415,8 +415,6 @@ impl ChanInner { } fn try_fulfill_sender_ordered(&mut self) -> Option>> { - self.waiting_send_to_one.retain(|handle| handle.is_active()); - loop { if let Some(msg) = find_remove_next_default_priority(&mut self.waiting_send_to_one)?.fulfill() @@ -427,8 +425,6 @@ impl ChanInner { } fn try_fulfill_sender_priority(&mut self) -> Option>> { - self.waiting_send_to_one.retain(|handle| handle.is_active()); - loop { if let Some(msg) = find_remove_highest_priority(&mut self.waiting_send_to_one)?.fulfill() @@ -439,8 +435,6 @@ impl ChanInner { } fn try_fulfill_sender_broadcast(&mut self) -> Option>> { - self.waiting_send_to_all.retain(|handle| handle.is_active()); - loop { if let Some(msg) = find_remove_highest_priority(&mut self.waiting_send_to_all)?.fulfill() @@ -472,6 +466,8 @@ fn find_remove_highest_priority( where M: HasPriority, { + queue.retain(|handle| handle.is_active()); // Only process handles which are still active. + let pos = queue .iter() .enumerate() @@ -487,6 +483,8 @@ fn find_remove_next_default_priority( where M: HasPriority, { + queue.retain(|handle| handle.is_active()); // Only process handles which are still active. + let pos = queue.iter().position(|handle| match handle.priority() { None => false, Some(p) => p == Priority::default(), From 81893345f6613a9205fc27a513856c6ff3251306 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 7 Aug 2022 22:19:19 +0200 Subject: [PATCH 14/25] Use `if let` where possible --- src/inbox.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index db31dcdb..e96ea27c 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -326,9 +326,8 @@ impl ChanInner { .capacity .map_or(false, |cap| cap == self.priority_queue.len()) { - match self.try_fulfill_sender_priority() { - Some(msg) => self.priority_queue.push(ByPriority(msg)), - None => {} + if let Some(msg) = self.try_fulfill_sender_priority() { + self.priority_queue.push(ByPriority(msg)) } } @@ -341,9 +340,8 @@ impl ChanInner { .capacity .map_or(false, |cap| cap == self.ordered_queue.len()) { - match self.try_fulfill_sender_ordered() { - Some(msg) => self.ordered_queue.push_back(msg), - None => {} + if let Some(msg) = self.try_fulfill_sender_ordered() { + self.ordered_queue.push_back(msg) } } @@ -362,9 +360,8 @@ impl ChanInner { // If len < cap, try fulfill a waiting sender if self.capacity.map_or(false, |cap| self.broadcast_tail < cap) { - match self.try_fulfill_sender_broadcast() { - Some(m) => self.send_broadcast(m), - None => {} + if let Some(m) = self.try_fulfill_sender_broadcast() { + self.send_broadcast(m) } } } From 474cd46bff1848323849bed336438531fa91daf1 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 11:31:52 +0200 Subject: [PATCH 15/25] Introduce `TrySend` trait to remove duplicate `Future` impl on `Sending` --- src/inbox.rs | 136 ++++++++++++++++++++++++--------------------- src/send_future.rs | 49 ++++++---------- 2 files changed, 90 insertions(+), 95 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index e96ea27c..7a84eacf 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -81,68 +81,6 @@ impl Chan { mailbox } - pub fn try_send_to_one( - &self, - mut message: Box>, - ) -> Result>>>, Error> { - if !self.is_connected() { - return Err(Error::Disconnected); - } - - message.start_span(); - - let mut inner = self.chan.lock().unwrap(); - - let unfulfilled_msg = if let Err(msg) = inner.try_fulfill_receiver(message) { - msg - } else { - return Ok(Ok(())); - }; - - match unfulfilled_msg { - m if m.priority() == Priority::default() && !inner.is_ordered_full() => { - inner.ordered_queue.push_back(m); - } - m if m.priority() != Priority::default() && !inner.is_priority_full() => { - inner.priority_queue.push(ByPriority(m)); - } - _ => { - let (handle, waiting) = WaitingSender::new(unfulfilled_msg); - inner.waiting_send_to_one.push_back(handle); - - return Ok(Err(MailboxFull(waiting))); - } - }; - - Ok(Ok(())) - } - - pub fn try_send_to_all( - &self, - mut message: Arc>, - ) -> Result>>>, Error> { - if !self.is_connected() { - return Err(Error::Disconnected); - } - - Arc::get_mut(&mut message) - .expect("calling after try_send not supported") - .start_span(); - - let mut inner = self.chan.lock().unwrap(); - - if inner.is_broadcast_full() { - let (handle, waiting) = WaitingSender::new(message); - inner.waiting_send_to_all.push_back(handle); - - return Ok(Err(MailboxFull(waiting))); - } - - inner.send_broadcast(message); - - Ok(Ok(())) - } - fn try_recv( &self, broadcast_mailbox: &BroadcastQueue, @@ -293,6 +231,80 @@ impl Chan { } } +pub trait TrySend { + fn try_send(&self, message: M) -> Result>, Error>; +} + +impl TrySend>> for Chan { + fn try_send( + &self, + mut message: Box>, + ) -> Result>>>, Error> { + if !self.is_connected() { + return Err(Error::Disconnected); + } + + message.start_span(); + + let mut inner = self.chan.lock().unwrap(); + + let unfulfilled_msg = if let Err(msg) = inner.try_fulfill_receiver(message) { + msg + } else { + return Ok(Ok(())); + }; + + match unfulfilled_msg { + m if m.priority() == Priority::default() && !inner.is_ordered_full() => { + inner.ordered_queue.push_back(m); + } + m if m.priority() != Priority::default() && !inner.is_priority_full() => { + inner.priority_queue.push(ByPriority(m)); + } + _ => { + let waiting = WaitingSender::new(unfulfilled_msg); + inner + .waiting_send_to_one + .push_back(Arc::downgrade(&waiting)); + + return Ok(Err(MailboxFull(waiting))); + } + }; + + Ok(Ok(())) + } +} + +impl TrySend>> for Chan { + fn try_send( + &self, + mut message: Arc>, + ) -> Result>>>, Error> { + if !self.is_connected() { + return Err(Error::Disconnected); + } + + Arc::get_mut(&mut message) + .expect("calling after try_send not supported") + .start_span(); + + let mut inner = self.chan.lock().unwrap(); + + if inner.is_broadcast_full() { + let waiting = WaitingSender::new(message); + inner + .waiting_send_to_all + .push_back(Arc::downgrade(&waiting)); + + return Ok(Err(MailboxFull(waiting))); + } + + inner.send_broadcast(message); + + Ok(Ok(())) + } +} + struct ChanInner { capacity: Option, ordered_queue: VecDeque>>, diff --git a/src/send_future.rs b/src/send_future.rs index 035b099a..7ccb7919 100644 --- a/src/send_future.rs +++ b/src/send_future.rs @@ -10,7 +10,7 @@ use futures_util::FutureExt; use crate::envelope::{ BroadcastEnvelope, BroadcastEnvelopeConcrete, MessageEnvelope, ReturningEnvelope, }; -use crate::inbox::{MailboxFull, SentMessage, WaitingSender}; +use crate::inbox::{Chan, MailboxFull, TrySend, WaitingSender}; use crate::refcount::RefCounter; use crate::{inbox, Error, Handler}; @@ -165,9 +165,9 @@ impl SendFuture { } /// The core state machine around sending a message to an actor's mailbox. -enum Sending { +enum Sending { New { - msg: SentMessage, + msg: M, sender: inbox::Sender, }, WaitingToSendOne(WaitingSender>>), @@ -175,9 +175,11 @@ enum Sending { Done, } -impl Future for Sending +impl Future for Sending where Rc: RefCounter, + M: Unpin, + Chan: TrySend, { type Output = Result<(), Error>; @@ -186,35 +188,16 @@ where loop { match mem::replace(this, Sending::Done) { - Sending::New { msg, sender } => { - match msg { - SentMessage::ToOneActor(to_one) => match sender.try_send_to_one(to_one)? { - Ok(()) => return Poll::Ready(Ok(())), - Err(MailboxFull(waiting)) => { - *this = Sending::WaitingToSendOne(waiting); - } - }, - SentMessage::ToAllActors(to_all) => { - match sender.try_send_to_all(to_all)? { - Ok(()) => return Poll::Ready(Ok(())), - Err(MailboxFull(waiting)) => { - *this = Sending::WaitingToSendAll(waiting); - } - } - } - }; - } - Sending::WaitingToSendOne(mut waiting) => { - return match waiting.poll_unpin(cx)? { - Poll::Ready(()) => Poll::Ready(Ok(())), - Poll::Pending => { - *this = Sending::WaitingToSendOne(waiting); - Poll::Pending - } - }; - } - Sending::WaitingToSendAll(mut waiting) => { - return match waiting.poll_unpin(cx)? { + Sending::New { msg, sender } => match sender.try_send(msg)? { + Ok(()) => return Poll::Ready(Ok(())), + Err(MailboxFull(waiting)) => { + *this = Sending::WaitingToSend(waiting); + } + }, + Sending::WaitingToSend(waiting) => { + let poll = { waiting.lock().poll_unpin(cx) }?; // Scoped separately to drop mutex guard asap. + + return match poll { Poll::Ready(()) => Poll::Ready(Ok(())), Poll::Pending => { *this = Sending::WaitingToSendAll(waiting); From 33e5f26923ce7de5b8e5b17738c0eaa826870f80 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 13:53:54 +0200 Subject: [PATCH 16/25] Remove `SentMessage` and add `TrySend` We don't actually need the `SentMessage` enum because we never go between the two variants. All code paths are statically known which allows us to thread the particular message type through whenever we need it. --- src/address.rs | 4 +-- src/inbox.rs | 23 +++---------- src/send_future.rs | 84 +++++++++++++++++++++++++++++++++------------- 3 files changed, 67 insertions(+), 44 deletions(-) diff --git a/src/address.rs b/src/address.rs index 20e6397c..708b200d 100644 --- a/src/address.rs +++ b/src/address.rs @@ -12,7 +12,7 @@ use event_listener::EventListener; use futures_util::FutureExt; use crate::refcount::{Either, RefCounter, Strong, Weak}; -use crate::send_future::{Broadcast, ResolveToHandlerReturn}; +use crate::send_future::{ActorNamedBroadcasting, Broadcast, ResolveToHandlerReturn}; use crate::{inbox, ActorNamedSending, Handler, SendFuture}; /// An [`Address`] is a reference to an actor through which messages can be sent. @@ -174,7 +174,7 @@ impl Address { /// /// The actor must implement [`Handler`] for this to work where [`Handler::Return`] is /// set to `()`. - pub fn broadcast(&self, msg: M) -> SendFuture, Broadcast> + pub fn broadcast(&self, msg: M) -> SendFuture, Broadcast> where M: Clone + Send + Sync + 'static, A: Handler, diff --git a/src/inbox.rs b/src/inbox.rs index 7a84eacf..57c5801a 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -262,10 +262,8 @@ impl TrySend>> for Chan { inner.priority_queue.push(ByPriority(m)); } _ => { - let waiting = WaitingSender::new(unfulfilled_msg); - inner - .waiting_send_to_one - .push_back(Arc::downgrade(&waiting)); + let (handle, waiting) = WaitingSender::new(unfulfilled_msg); + inner.waiting_send_to_one.push_back(handle); return Ok(Err(MailboxFull(waiting))); } @@ -291,10 +289,8 @@ impl TrySend>> for Chan { let mut inner = self.chan.lock().unwrap(); if inner.is_broadcast_full() { - let waiting = WaitingSender::new(message); - inner - .waiting_send_to_all - .push_back(Arc::downgrade(&waiting)); + let (handle, waiting) = WaitingSender::new(message); + inner.waiting_send_to_all.push_back(handle); return Ok(Err(MailboxFull(waiting))); } @@ -502,17 +498,6 @@ where queue.remove(pos) } -pub enum SentMessage { - ToOneActor(Box>), - ToAllActors(Arc>), -} - -impl From>> for SentMessage { - fn from(msg: Box>) -> Self { - SentMessage::ToOneActor(msg) - } -} - /// An error returned in case the mailbox of an actor is full. pub struct MailboxFull(pub WaitingSender); diff --git a/src/send_future.rs b/src/send_future.rs index 7ccb7919..331c9dc1 100644 --- a/src/send_future.rs +++ b/src/send_future.rs @@ -72,8 +72,15 @@ where } } -/// "Sending" state of [`SendFuture`] for cases where the actor type is named. -pub struct ActorNamedSending(Sending); +/// "Sending" state of [`SendFuture`] for cases where the actor type is named and we sent a single message. +pub struct ActorNamedSending( + Sending>, Rc>, +); + +/// "Sending" state of [`SendFuture`] for cases where the actor type is named and we broadcast a message. +pub struct ActorNamedBroadcasting( + Sending>, Rc>, +); /// "Sending" state of [`SendFuture`] for cases where the actor type is erased. pub struct ActorErasedSending(Box); @@ -95,7 +102,7 @@ where Self { sending: ActorNamedSending(Sending::New { - msg: SentMessage::ToOneActor(Box::new(envelope)), + msg: Box::new(envelope) as Box>, sender, }), state: ResolveToHandlerReturn::new(receiver), @@ -115,7 +122,7 @@ impl SendFuture> { Self { sending: ActorErasedSending(Box::new(Sending::New { - msg: SentMessage::ToOneActor(Box::new(envelope)), + msg: Box::new(envelope) as Box>, sender, })), state: ResolveToHandlerReturn::new(receiver), @@ -123,7 +130,7 @@ impl SendFuture> { } } -impl SendFuture, Broadcast> +impl SendFuture, Broadcast> where Rc: RefCounter, { @@ -135,8 +142,8 @@ where let envelope = BroadcastEnvelopeConcrete::new(msg, 0); Self { - sending: ActorNamedSending(Sending::New { - msg: SentMessage::ToAllActors(Arc::new(envelope)), + sending: ActorNamedBroadcasting(Sending::New { + msg: Arc::new(envelope) as Arc>, sender, }), state: Broadcast(()), @@ -156,7 +163,7 @@ impl SendFuture { Self { sending: ActorErasedSending(Box::new(Sending::New { - msg: SentMessage::ToAllActors(Arc::new(envelope)), + msg: Arc::new(envelope) as Arc>, sender, })), state: Broadcast(()), @@ -170,8 +177,7 @@ enum Sending { msg: M, sender: inbox::Sender, }, - WaitingToSendOne(WaitingSender>>), - WaitingToSendAll(WaitingSender>>), + WaitingToSend(WaitingSender), Done, } @@ -194,13 +200,11 @@ where *this = Sending::WaitingToSend(waiting); } }, - Sending::WaitingToSend(waiting) => { - let poll = { waiting.lock().poll_unpin(cx) }?; // Scoped separately to drop mutex guard asap. - - return match poll { + Sending::WaitingToSend(mut waiting) => { + return match waiting.poll_unpin(cx)? { Poll::Ready(()) => Poll::Ready(Ok(())), Poll::Pending => { - *this = Sending::WaitingToSendAll(waiting); + *this = Sending::WaitingToSend(waiting); Poll::Pending } }; @@ -211,7 +215,7 @@ where } } -impl FusedFuture for Sending +impl FusedFuture for Sending where Self: Future, Rc: RefCounter, @@ -239,6 +243,24 @@ where } } +impl Future for ActorNamedBroadcasting { + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.get_mut().0.poll_unpin(cx) + } +} + +impl FusedFuture for ActorNamedBroadcasting +where + Self: Future, + Rc: RefCounter, +{ + fn is_terminated(&self) -> bool { + self.0.is_terminated() + } +} + impl Future for ActorErasedSending { type Output = Result<(), Error>; @@ -333,21 +355,28 @@ mod private { fn set_priority(&mut self, priority: u32); } - impl SetPriority for Sending + impl SetPriority for Sending>, Rc> where Rc: RefCounter, { fn set_priority(&mut self, new_priority: u32) { - let msg = match self { - Sending::New { msg, .. } => msg, + match self { + Sending::New { msg, .. } => msg.set_priority(new_priority), _ => panic!("Cannot set priority after first poll"), - }; + } + } + } - match msg { - SentMessage::ToOneActor(m) => m.set_priority(new_priority), - SentMessage::ToAllActors(m) => Arc::get_mut(m) + impl SetPriority for Sending>, Rc> + where + Rc: RefCounter, + { + fn set_priority(&mut self, new_priority: u32) { + match self { + Sending::New { msg, .. } => Arc::get_mut(msg) .expect("envelope is not cloned until here") .set_priority(new_priority), + _ => panic!("Cannot set priority after first poll"), } } } @@ -361,6 +390,15 @@ mod private { } } + impl SetPriority for ActorNamedBroadcasting + where + Rc: RefCounter, + { + fn set_priority(&mut self, priority: u32) { + self.0.set_priority(priority) + } + } + impl SetPriority for ActorErasedSending { fn set_priority(&mut self, priority: u32) { self.0.set_priority(priority) From 1ea2faa29e4add328154d0a9b70ca025b46763c9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 14:05:07 +0200 Subject: [PATCH 17/25] Inline `SpinLock` type It is almost the same length when fully qualified and does not introduce confusion on what the type is. `spin::Mutex` is known to come from the `spin` crate whereas `SpinLock` could be anything. --- src/inbox.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index 57c5801a..6a0f12ff 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -22,8 +22,7 @@ use crate::inbox::tx::TxStrong; use crate::inbox::waiting_receiver::WaitingReceiver; use crate::{Actor, Error}; -type Spinlock = spin::Mutex; -type BroadcastQueue = Spinlock>>>>; +type BroadcastQueue = spin::Mutex>>>>; /// Create an actor mailbox, returning a sender and receiver for it. The given capacity is applied /// severally to each send type - priority, ordered, and broadcast. @@ -71,7 +70,7 @@ impl Chan { /// Creates a new broadcast mailbox on this channel. fn new_broadcast_mailbox(&self) -> Arc> { - let mailbox = Arc::new(Spinlock::new(BinaryHeap::new())); + let mailbox = Arc::new(spin::Mutex::new(BinaryHeap::new())); self.chan .lock() .unwrap() From 580aa678ce7874f1a0ea5534905ad0c856152b83 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 14:14:41 +0200 Subject: [PATCH 18/25] Introduce type aliases This allows more things to fit onto one line. --- src/envelope.rs | 6 ++-- src/inbox.rs | 63 ++++++++++++++++------------------- src/inbox/rx.rs | 5 ++- src/inbox/waiting_receiver.rs | 12 +++---- src/send_future.rs | 26 ++++++--------- 5 files changed, 48 insertions(+), 64 deletions(-) diff --git a/src/envelope.rs b/src/envelope.rs index 5fc81231..5860a20f 100644 --- a/src/envelope.rs +++ b/src/envelope.rs @@ -8,7 +8,7 @@ use futures_core::future::BoxFuture; use futures_util::FutureExt; use crate::context::Context; -use crate::inbox::{HasPriority, Priority}; +use crate::inbox::{HasPriority, MessageToAll, MessageToOne, Priority}; use crate::{Actor, Handler}; /// A message envelope is a struct that encapsulates a message and its return channel sender (if applicable). @@ -170,7 +170,7 @@ impl HasPriority for ReturningEnvelope { } } -impl HasPriority for Box> { +impl HasPriority for MessageToOne { fn priority(&self) -> Priority { self.as_ref().priority() } @@ -236,7 +236,7 @@ pub trait BroadcastEnvelope: HasPriority + Send + Sync { ) -> (BoxFuture<'a, ControlFlow<()>>, Span); } -impl HasPriority for Arc> { +impl HasPriority for MessageToAll { fn priority(&self) -> Priority { self.as_ref().priority() } diff --git a/src/inbox.rs b/src/inbox.rs index 6a0f12ff..be7c6238 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -22,7 +22,10 @@ use crate::inbox::tx::TxStrong; use crate::inbox::waiting_receiver::WaitingReceiver; use crate::{Actor, Error}; -type BroadcastQueue = spin::Mutex>>>>; +pub type MessageToOne = Box>; +pub type MessageToAll = Arc>; + +type BroadcastQueue = spin::Mutex>>>; /// Create an actor mailbox, returning a sender and receiver for it. The given capacity is applied /// severally to each send type - priority, ordered, and broadcast. @@ -213,7 +216,7 @@ impl Chan { /// given message so it does not get lost. /// /// Note that the ordering of messages in the queues may be slightly off with this function. - fn requeue_message(&self, msg: Box>) { + fn requeue_message(&self, msg: MessageToOne) { let mut inner = match self.chan.lock() { Ok(lock) => lock, Err(_) => return, // If we can't lock the inner channel, there is nothing we can do. @@ -234,11 +237,11 @@ pub trait TrySend { fn try_send(&self, message: M) -> Result>, Error>; } -impl TrySend>> for Chan { +impl TrySend> for Chan { fn try_send( &self, - mut message: Box>, - ) -> Result>>>, Error> { + mut message: MessageToOne, + ) -> Result>>, Error> { if !self.is_connected() { return Err(Error::Disconnected); } @@ -272,11 +275,11 @@ impl TrySend>> for Chan { } } -impl TrySend>> for Chan { +impl TrySend> for Chan { fn try_send( &self, - mut message: Arc>, - ) -> Result>>>, Error> { + mut message: MessageToAll, + ) -> Result>>, Error> { if !self.is_connected() { return Err(Error::Disconnected); } @@ -302,13 +305,11 @@ impl TrySend>> for Chan { struct ChanInner { capacity: Option, - ordered_queue: VecDeque>>, - waiting_send_to_one: - VecDeque>>>, - waiting_send_to_all: - VecDeque>>>, + ordered_queue: VecDeque>, + waiting_send_to_one: VecDeque>>, + waiting_send_to_all: VecDeque>>, waiting_receivers_handles: VecDeque>, - priority_queue: BinaryHeap>>>, + priority_queue: BinaryHeap>>, broadcast_queues: Vec>>, broadcast_tail: usize, } @@ -327,7 +328,7 @@ impl ChanInner { } } - fn pop_priority(&mut self) -> Option>> { + fn pop_priority(&mut self) -> Option> { // If len < cap after popping this message, try fulfill at most one waiting sender if self .capacity @@ -341,7 +342,7 @@ impl ChanInner { Some(self.priority_queue.pop()?.0) } - fn pop_ordered(&mut self) -> Option>> { + fn pop_ordered(&mut self) -> Option> { // If len < cap after popping this message, try fulfill at most one waiting sender if self .capacity @@ -355,10 +356,7 @@ impl ChanInner { self.ordered_queue.pop_front() } - fn pop_broadcast( - &mut self, - broadcast_mailbox: &BroadcastQueue, - ) -> Option>> { + fn pop_broadcast(&mut self, broadcast_mailbox: &BroadcastQueue) -> Option> { let message = broadcast_mailbox.lock().pop(); // Advance the broadcast tail if we successfully took a message. @@ -388,7 +386,7 @@ impl ChanInner { longest } - fn send_broadcast(&mut self, m: Arc>) { + fn send_broadcast(&mut self, m: MessageToAll) { self.broadcast_queues.retain(|queue| match queue.upgrade() { Some(q) => { q.lock().push(ByPriority(m.clone())); @@ -404,10 +402,7 @@ impl ChanInner { self.broadcast_tail += 1; } - fn try_fulfill_receiver( - &mut self, - mut msg: Box>, - ) -> Result<(), Box>> { + fn try_fulfill_receiver(&mut self, mut msg: MessageToOne) -> Result<(), MessageToOne> { while let Some(rx) = self.waiting_receivers_handles.pop_front() { match rx.notify_new_message(msg) { Ok(()) => return Ok(()), @@ -418,7 +413,7 @@ impl ChanInner { Err(msg) } - fn try_fulfill_sender_ordered(&mut self) -> Option>> { + fn try_fulfill_sender_ordered(&mut self) -> Option> { loop { if let Some(msg) = find_remove_next_default_priority(&mut self.waiting_send_to_one)?.fulfill() @@ -428,7 +423,7 @@ impl ChanInner { } } - fn try_fulfill_sender_priority(&mut self) -> Option>> { + fn try_fulfill_sender_priority(&mut self) -> Option> { loop { if let Some(msg) = find_remove_highest_priority(&mut self.waiting_send_to_one)?.fulfill() @@ -438,7 +433,7 @@ impl ChanInner { } } - fn try_fulfill_sender_broadcast(&mut self) -> Option>> { + fn try_fulfill_sender_broadcast(&mut self) -> Option> { loop { if let Some(msg) = find_remove_highest_priority(&mut self.waiting_send_to_all)?.fulfill() @@ -501,19 +496,19 @@ where pub struct MailboxFull(pub WaitingSender); pub enum ActorMessage { - ToOneActor(Box>), - ToAllActors(Arc>), + ToOneActor(MessageToOne), + ToAllActors(MessageToAll), Shutdown, } -impl From>> for ActorMessage { - fn from(msg: Box>) -> Self { +impl From> for ActorMessage { + fn from(msg: MessageToOne) -> Self { ActorMessage::ToOneActor(msg) } } -impl From>> for ActorMessage { - fn from(msg: Arc>) -> Self { +impl From> for ActorMessage { + fn from(msg: MessageToAll) -> Self { ActorMessage::ToAllActors(msg) } } diff --git a/src/inbox/rx.rs b/src/inbox/rx.rs index ae90a714..f7e12afb 100644 --- a/src/inbox/rx.rs +++ b/src/inbox/rx.rs @@ -7,10 +7,9 @@ use std::task::{Context, Poll}; use futures_core::FusedFuture; use futures_util::FutureExt; -use crate::envelope::BroadcastEnvelope; use crate::inbox::tx::{TxStrong, TxWeak}; use crate::inbox::waiting_receiver::WaitingReceiver; -use crate::inbox::{ActorMessage, BroadcastQueue, Chan, Sender}; +use crate::inbox::{ActorMessage, BroadcastQueue, Chan, MessageToAll, Sender}; pub struct Receiver { inner: Arc>, @@ -18,7 +17,7 @@ pub struct Receiver { } impl Receiver { - pub fn next_broadcast_message(&self) -> Option>> { + pub fn next_broadcast_message(&self) -> Option> { self.inner .chan .lock() diff --git a/src/inbox/waiting_receiver.rs b/src/inbox/waiting_receiver.rs index 4d5fd80f..24929663 100644 --- a/src/inbox/waiting_receiver.rs +++ b/src/inbox/waiting_receiver.rs @@ -2,9 +2,8 @@ use std::task::{Context, Poll}; use futures_util::FutureExt; -use crate::envelope::MessageEnvelope; use crate::inbox::rx::Receiver; -use crate::inbox::ActorMessage; +use crate::inbox::{ActorMessage, MessageToOne}; /// A [`WaitingReceiver`] is handed out by the channel any time [`Chan::try_recv`](crate::inbox::Chan::try_recv) is called on an empty mailbox. /// @@ -38,10 +37,7 @@ impl FulfillHandle { /// /// This function will return the message in an `Err` if the [`WaitingReceiver`] has since called /// [`cancel`](WaitingReceiver::cancel) and is therefore unable to handle the message. - pub fn notify_new_message( - self, - msg: Box>, - ) -> Result<(), Box>> { + pub fn notify_new_message(self, msg: MessageToOne) -> Result<(), MessageToOne> { self.0 .send(CtrlMsg::NewMessage(msg)) .map_err(|reason| match reason { @@ -65,7 +61,7 @@ impl WaitingReceiver { /// /// It is important to call this message over just dropping the [`WaitingReceiver`] as this /// message would otherwise be dropped. - pub fn cancel(&mut self) -> Option>> { + pub fn cancel(&mut self) -> Option> { match self.0.try_recv() { Ok(Some(CtrlMsg::NewMessage(msg))) => Some(msg), _ => None, @@ -101,7 +97,7 @@ impl WaitingReceiver { } enum CtrlMsg { - NewMessage(Box>), + NewMessage(MessageToOne), NewBroadcast, Shutdown, } diff --git a/src/send_future.rs b/src/send_future.rs index 331c9dc1..dfae7616 100644 --- a/src/send_future.rs +++ b/src/send_future.rs @@ -7,10 +7,8 @@ use std::task::{Context, Poll}; use futures_core::FusedFuture; use futures_util::FutureExt; -use crate::envelope::{ - BroadcastEnvelope, BroadcastEnvelopeConcrete, MessageEnvelope, ReturningEnvelope, -}; -use crate::inbox::{Chan, MailboxFull, TrySend, WaitingSender}; +use crate::envelope::{BroadcastEnvelopeConcrete, ReturningEnvelope}; +use crate::inbox::{Chan, MailboxFull, MessageToAll, MessageToOne, TrySend, WaitingSender}; use crate::refcount::RefCounter; use crate::{inbox, Error, Handler}; @@ -73,14 +71,10 @@ where } /// "Sending" state of [`SendFuture`] for cases where the actor type is named and we sent a single message. -pub struct ActorNamedSending( - Sending>, Rc>, -); +pub struct ActorNamedSending(Sending, Rc>); /// "Sending" state of [`SendFuture`] for cases where the actor type is named and we broadcast a message. -pub struct ActorNamedBroadcasting( - Sending>, Rc>, -); +pub struct ActorNamedBroadcasting(Sending, Rc>); /// "Sending" state of [`SendFuture`] for cases where the actor type is erased. pub struct ActorErasedSending(Box); @@ -102,7 +96,7 @@ where Self { sending: ActorNamedSending(Sending::New { - msg: Box::new(envelope) as Box>, + msg: Box::new(envelope) as MessageToOne, sender, }), state: ResolveToHandlerReturn::new(receiver), @@ -122,7 +116,7 @@ impl SendFuture> { Self { sending: ActorErasedSending(Box::new(Sending::New { - msg: Box::new(envelope) as Box>, + msg: Box::new(envelope) as MessageToOne, sender, })), state: ResolveToHandlerReturn::new(receiver), @@ -143,7 +137,7 @@ where Self { sending: ActorNamedBroadcasting(Sending::New { - msg: Arc::new(envelope) as Arc>, + msg: Arc::new(envelope) as MessageToAll, sender, }), state: Broadcast(()), @@ -163,7 +157,7 @@ impl SendFuture { Self { sending: ActorErasedSending(Box::new(Sending::New { - msg: Arc::new(envelope) as Arc>, + msg: Arc::new(envelope) as MessageToAll, sender, })), state: Broadcast(()), @@ -355,7 +349,7 @@ mod private { fn set_priority(&mut self, priority: u32); } - impl SetPriority for Sending>, Rc> + impl SetPriority for Sending, Rc> where Rc: RefCounter, { @@ -367,7 +361,7 @@ mod private { } } - impl SetPriority for Sending>, Rc> + impl SetPriority for Sending, Rc> where Rc: RefCounter, { From 2cf1a98a001cefd813957ad43d6fb19f925bf1a0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 14:34:04 +0200 Subject: [PATCH 19/25] Remove panic and add docs to `FulfillHandle` --- src/inbox/waiting_sender.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/inbox/waiting_sender.rs b/src/inbox/waiting_sender.rs index bcab76bf..841fc883 100644 --- a/src/inbox/waiting_sender.rs +++ b/src/inbox/waiting_sender.rs @@ -24,22 +24,23 @@ impl FulFillHandle { Weak::strong_count(&self.0) > 0 } + /// Fulfill the paired [`WaitingSender`] by taking out the waiting message. + /// + /// This may return `None` in case the [`WaitingSender`] is no longer active or has already been closed or fulfill. pub fn fulfill(self) -> Option { let inner = self.0.upgrade()?; let mut this = inner.lock(); - Some(match mem::replace(&mut *this, Inner::Delivered) { + match mem::replace(&mut *this, Inner::Delivered) { Inner::Active { mut waker, message } => { if let Some(waker) = waker.take() { waker.wake(); } - message + Some(message) } - Inner::Delivered | Inner::Closed => { - panic!("WaitingSender is already fulfilled or closed") - } - }) + Inner::Delivered | Inner::Closed => None, + } } } From 0d9524de90867a9aa4e889903413a5237e54dc9a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 14:36:43 +0200 Subject: [PATCH 20/25] Rename `FulfillHandle`s to `Handle` Shorter and equally expressive when used in a fully-qualified way. --- src/inbox.rs | 14 +++++++------- src/inbox/waiting_receiver.rs | 8 ++++---- src/inbox/waiting_sender.rs | 12 ++++++------ 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index be7c6238..46f2711a 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -306,9 +306,9 @@ impl TrySend> for Chan { struct ChanInner { capacity: Option, ordered_queue: VecDeque>, - waiting_send_to_one: VecDeque>>, - waiting_send_to_all: VecDeque>>, - waiting_receivers_handles: VecDeque>, + waiting_send_to_one: VecDeque>>, + waiting_send_to_all: VecDeque>>, + waiting_receivers_handles: VecDeque>, priority_queue: BinaryHeap>>, broadcast_queues: Vec>>, broadcast_tail: usize, @@ -460,8 +460,8 @@ impl ChanInner { } fn find_remove_highest_priority( - queue: &mut VecDeque>, -) -> Option> + queue: &mut VecDeque>, +) -> Option> where M: HasPriority, { @@ -477,8 +477,8 @@ where } fn find_remove_next_default_priority( - queue: &mut VecDeque>, -) -> Option> + queue: &mut VecDeque>, +) -> Option> where M: HasPriority, { diff --git a/src/inbox/waiting_receiver.rs b/src/inbox/waiting_receiver.rs index 24929663..a802c8d9 100644 --- a/src/inbox/waiting_receiver.rs +++ b/src/inbox/waiting_receiver.rs @@ -14,9 +14,9 @@ pub struct WaitingReceiver(catty::Receiver>); /// /// It is stored internally in the channel and used by the channel implementation to notify a /// [`WaitingReceiver`] once a new message hits the mailbox. -pub struct FulfillHandle(catty::Sender>); +pub struct Handle(catty::Sender>); -impl FulfillHandle { +impl Handle { /// Notify the connected [`WaitingReceiver`] that the channel is shutting down. pub fn notify_channel_shutdown(self) { let _ = self.0.send(CtrlMsg::Shutdown); @@ -48,10 +48,10 @@ impl FulfillHandle { } impl WaitingReceiver { - pub fn new() -> (WaitingReceiver, FulfillHandle) { + pub fn new() -> (WaitingReceiver, Handle) { let (sender, receiver) = catty::oneshot(); - (WaitingReceiver(receiver), FulfillHandle(sender)) + (WaitingReceiver(receiver), Handle(sender)) } /// Cancel this [`WaitingReceiver`]. diff --git a/src/inbox/waiting_sender.rs b/src/inbox/waiting_sender.rs index 841fc883..6698a37a 100644 --- a/src/inbox/waiting_sender.rs +++ b/src/inbox/waiting_sender.rs @@ -9,17 +9,17 @@ use crate::Error; pub struct WaitingSender(Arc>>); -pub struct FulFillHandle(Weak>>); +pub struct Handle(Weak>>); impl WaitingSender { - pub fn new(msg: M) -> (FulFillHandle, WaitingSender) { + pub fn new(msg: M) -> (Handle, WaitingSender) { let inner = Arc::new(spin::Mutex::new(Inner::new(msg))); - (FulFillHandle(Arc::downgrade(&inner)), WaitingSender(inner)) + (Handle(Arc::downgrade(&inner)), WaitingSender(inner)) } } -impl FulFillHandle { +impl Handle { pub fn is_active(&self) -> bool { Weak::strong_count(&self.0) > 0 } @@ -44,7 +44,7 @@ impl FulFillHandle { } } -impl Drop for FulFillHandle { +impl Drop for Handle { fn drop(&mut self) { if let Some(inner) = self.0.upgrade() { let mut this = inner.lock(); @@ -63,7 +63,7 @@ impl Drop for FulFillHandle { } } -impl FulFillHandle +impl Handle where M: HasPriority, { From 5da689072d167bbdbf68504db6a6927b933c0174 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 14:40:26 +0200 Subject: [PATCH 21/25] Change `fulfill` to `take_message` wording for `waiting_sender` --- src/inbox.rs | 18 +++++++++--------- src/inbox/waiting_sender.rs | 6 +++--- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index 46f2711a..e4e499a9 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -334,7 +334,7 @@ impl ChanInner { .capacity .map_or(false, |cap| cap == self.priority_queue.len()) { - if let Some(msg) = self.try_fulfill_sender_priority() { + if let Some(msg) = self.try_take_waiting_priority_message() { self.priority_queue.push(ByPriority(msg)) } } @@ -348,7 +348,7 @@ impl ChanInner { .capacity .map_or(false, |cap| cap == self.ordered_queue.len()) { - if let Some(msg) = self.try_fulfill_sender_ordered() { + if let Some(msg) = self.try_take_waiting_ordered_message() { self.ordered_queue.push_back(msg) } } @@ -365,7 +365,7 @@ impl ChanInner { // If len < cap, try fulfill a waiting sender if self.capacity.map_or(false, |cap| self.broadcast_tail < cap) { - if let Some(m) = self.try_fulfill_sender_broadcast() { + if let Some(m) = self.try_take_waiting_broadcast_message() { self.send_broadcast(m) } } @@ -413,30 +413,30 @@ impl ChanInner { Err(msg) } - fn try_fulfill_sender_ordered(&mut self) -> Option> { + fn try_take_waiting_ordered_message(&mut self) -> Option> { loop { if let Some(msg) = - find_remove_next_default_priority(&mut self.waiting_send_to_one)?.fulfill() + find_remove_next_default_priority(&mut self.waiting_send_to_one)?.take_message() { return Some(msg); } } } - fn try_fulfill_sender_priority(&mut self) -> Option> { + fn try_take_waiting_priority_message(&mut self) -> Option> { loop { if let Some(msg) = - find_remove_highest_priority(&mut self.waiting_send_to_one)?.fulfill() + find_remove_highest_priority(&mut self.waiting_send_to_one)?.take_message() { return Some(msg); } } } - fn try_fulfill_sender_broadcast(&mut self) -> Option> { + fn try_take_waiting_broadcast_message(&mut self) -> Option> { loop { if let Some(msg) = - find_remove_highest_priority(&mut self.waiting_send_to_all)?.fulfill() + find_remove_highest_priority(&mut self.waiting_send_to_all)?.take_message() { return Some(msg); } diff --git a/src/inbox/waiting_sender.rs b/src/inbox/waiting_sender.rs index 6698a37a..d110166e 100644 --- a/src/inbox/waiting_sender.rs +++ b/src/inbox/waiting_sender.rs @@ -24,10 +24,10 @@ impl Handle { Weak::strong_count(&self.0) > 0 } - /// Fulfill the paired [`WaitingSender`] by taking out the waiting message. + /// Take the message out of the paired [`WaitingSender`]. /// - /// This may return `None` in case the [`WaitingSender`] is no longer active or has already been closed or fulfill. - pub fn fulfill(self) -> Option { + /// This may return `None` in case the [`WaitingSender`] is no longer active, has already been closed or the message was already taken. + pub fn take_message(self) -> Option { let inner = self.0.upgrade()?; let mut this = inner.lock(); From c2966db0e389a17d2c8f4122af3f8fc60356347b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 8 Aug 2022 15:08:31 +0200 Subject: [PATCH 22/25] Fix docs --- src/inbox/waiting_receiver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/inbox/waiting_receiver.rs b/src/inbox/waiting_receiver.rs index a802c8d9..b3cd579a 100644 --- a/src/inbox/waiting_receiver.rs +++ b/src/inbox/waiting_receiver.rs @@ -10,7 +10,7 @@ use crate::inbox::{ActorMessage, MessageToOne}; /// [`WaitingReceiver`] implements [`Future`](std::future::Future) which will resolve once a message lands in the mailbox. pub struct WaitingReceiver(catty::Receiver>); -/// A [`FulfillHandle`] is the counter-part to a [`WaitingReceiver`]. +/// A [`Handle`] is the counter-part to a [`WaitingReceiver`]. /// /// It is stored internally in the channel and used by the channel implementation to notify a /// [`WaitingReceiver`] once a new message hits the mailbox. From 95ffe8a3838548045919307583e7965c6ed16df2 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 10 Aug 2022 16:34:31 +0200 Subject: [PATCH 23/25] Remove empty line --- src/inbox.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/inbox.rs b/src/inbox.rs index f8ed76d4..2b8010b9 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -352,7 +352,6 @@ impl ChanInner { Some(msg) } - fn pop_broadcast( &mut self, broadcast_mailbox: &BroadcastQueue, From 320c210154d4773d6afa16f9298474f36211116c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 15 Aug 2022 21:44:08 +0200 Subject: [PATCH 24/25] Remove unnecessary `Unpin` bound --- src/inbox/waiting_sender.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/inbox/waiting_sender.rs b/src/inbox/waiting_sender.rs index d110166e..ba2b9a7c 100644 --- a/src/inbox/waiting_sender.rs +++ b/src/inbox/waiting_sender.rs @@ -78,10 +78,7 @@ where } } -impl Future for WaitingSender -where - A: Unpin, -{ +impl Future for WaitingSender { type Output = Result<(), Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { From e9eb9d3a32b06195ef0061997b77dd69fcd012ce Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 16 Aug 2022 11:53:43 +0200 Subject: [PATCH 25/25] Remove `TrySend` trait --- src/inbox.rs | 132 +++++++++++++++++++++------------------------ src/send_future.rs | 40 ++++++++++++-- 2 files changed, 97 insertions(+), 75 deletions(-) diff --git a/src/inbox.rs b/src/inbox.rs index 2b8010b9..cb4de975 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -83,6 +83,68 @@ impl Chan { mailbox } + pub fn try_send_to_one( + &self, + mut message: MessageToOne, + ) -> Result>>, Error> { + if !self.is_connected() { + return Err(Error::Disconnected); + } + + message.start_span(); + + let mut inner = self.chan.lock().unwrap(); + + let unfulfilled_msg = if let Err(msg) = inner.try_fulfill_receiver(message) { + msg + } else { + return Ok(Ok(())); + }; + + match unfulfilled_msg { + m if m.priority() == Priority::default() && !inner.is_ordered_full() => { + inner.ordered_queue.push_back(m); + } + m if m.priority() != Priority::default() && !inner.is_priority_full() => { + inner.priority_queue.push(ByPriority(m)); + } + _ => { + let (handle, waiting) = WaitingSender::new(unfulfilled_msg); + inner.waiting_send_to_one.push_back(handle); + + return Ok(Err(MailboxFull(waiting))); + } + }; + + Ok(Ok(())) + } + + pub fn try_send_to_all( + &self, + mut message: MessageToAll, + ) -> Result>>, Error> { + if !self.is_connected() { + return Err(Error::Disconnected); + } + + Arc::get_mut(&mut message) + .expect("calling after try_send not supported") + .start_span(); + + let mut inner = self.chan.lock().unwrap(); + + if inner.is_broadcast_full() { + let (handle, waiting) = WaitingSender::new(message); + inner.waiting_send_to_all.push_back(handle); + + return Ok(Err(MailboxFull(waiting))); + } + + inner.send_broadcast(message); + + Ok(Ok(())) + } + fn try_recv( &self, broadcast_mailbox: &BroadcastQueue, @@ -233,76 +295,6 @@ impl Chan { } } -pub trait TrySend { - fn try_send(&self, message: M) -> Result>, Error>; -} - -impl TrySend> for Chan { - fn try_send( - &self, - mut message: MessageToOne, - ) -> Result>>, Error> { - if !self.is_connected() { - return Err(Error::Disconnected); - } - - message.start_span(); - - let mut inner = self.chan.lock().unwrap(); - - let unfulfilled_msg = if let Err(msg) = inner.try_fulfill_receiver(message) { - msg - } else { - return Ok(Ok(())); - }; - - match unfulfilled_msg { - m if m.priority() == Priority::default() && !inner.is_ordered_full() => { - inner.ordered_queue.push_back(m); - } - m if m.priority() != Priority::default() && !inner.is_priority_full() => { - inner.priority_queue.push(ByPriority(m)); - } - _ => { - let (handle, waiting) = WaitingSender::new(unfulfilled_msg); - inner.waiting_send_to_one.push_back(handle); - - return Ok(Err(MailboxFull(waiting))); - } - }; - - Ok(Ok(())) - } -} - -impl TrySend> for Chan { - fn try_send( - &self, - mut message: MessageToAll, - ) -> Result>>, Error> { - if !self.is_connected() { - return Err(Error::Disconnected); - } - - Arc::get_mut(&mut message) - .expect("calling after try_send not supported") - .start_span(); - - let mut inner = self.chan.lock().unwrap(); - - if inner.is_broadcast_full() { - let (handle, waiting) = WaitingSender::new(message); - inner.waiting_send_to_all.push_back(handle); - - return Ok(Err(MailboxFull(waiting))); - } - - inner.send_broadcast(message); - - Ok(Ok(())) - } -} - struct ChanInner { capacity: Option, ordered_queue: VecDeque>, diff --git a/src/send_future.rs b/src/send_future.rs index dfae7616..b06c776c 100644 --- a/src/send_future.rs +++ b/src/send_future.rs @@ -8,7 +8,7 @@ use futures_core::FusedFuture; use futures_util::FutureExt; use crate::envelope::{BroadcastEnvelopeConcrete, ReturningEnvelope}; -use crate::inbox::{Chan, MailboxFull, MessageToAll, MessageToOne, TrySend, WaitingSender}; +use crate::inbox::{MailboxFull, MessageToAll, MessageToOne, WaitingSender}; use crate::refcount::RefCounter; use crate::{inbox, Error, Handler}; @@ -175,11 +175,9 @@ enum Sending { Done, } -impl Future for Sending +impl Future for Sending, Rc> where Rc: RefCounter, - M: Unpin, - Chan: TrySend, { type Output = Result<(), Error>; @@ -188,7 +186,39 @@ where loop { match mem::replace(this, Sending::Done) { - Sending::New { msg, sender } => match sender.try_send(msg)? { + Sending::New { msg, sender } => match sender.try_send_to_one(msg)? { + Ok(()) => return Poll::Ready(Ok(())), + Err(MailboxFull(waiting)) => { + *this = Sending::WaitingToSend(waiting); + } + }, + Sending::WaitingToSend(mut waiting) => { + return match waiting.poll_unpin(cx)? { + Poll::Ready(()) => Poll::Ready(Ok(())), + Poll::Pending => { + *this = Sending::WaitingToSend(waiting); + Poll::Pending + } + }; + } + Sending::Done => panic!("Polled after completion"), + } + } + } +} + +impl Future for Sending, Rc> +where + Rc: RefCounter, +{ + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + match mem::replace(this, Sending::Done) { + Sending::New { msg, sender } => match sender.try_send_to_all(msg)? { Ok(()) => return Poll::Ready(Ok(())), Err(MailboxFull(waiting)) => { *this = Sending::WaitingToSend(waiting);