diff --git a/src/context.rs b/src/context.rs index be6d858b..df80d0de 100644 --- a/src/context.rs +++ b/src/context.rs @@ -121,13 +121,7 @@ impl Context { /// Get for the next message from the actor's mailbox. pub fn next_message(&self) -> ReceiveFuture { - // It is important to clone the `Arc` here otherwise the future will read from a new broadcast mailbox. - let receiver = inbox::Receiver::with_broadcast_mailbox( - self.mailbox.inner.clone(), - self.mailbox.broadcast_mailbox.clone(), - ); - - ReceiveFuture::new(receiver) + self.mailbox.receive() } /// Handle one message and return whether to exit from the manage loop or not. diff --git a/src/inbox/rx.rs b/src/inbox/rx.rs index 6d4aa656..32765c77 100644 --- a/src/inbox/rx.rs +++ b/src/inbox/rx.rs @@ -1,34 +1,44 @@ +use std::ops::Deref; use std::sync::Arc; +use crate::envelope::BroadcastEnvelope; use crate::inbox::tx::{TxStrong, TxWeak}; -use crate::inbox::{BroadcastQueue, Chan, Sender}; +use crate::inbox::{ActorMessage, BroadcastQueue, Chan, Sender, WaitingReceiver}; +use crate::recv_future::ReceiveFuture; pub struct Receiver { - pub inner: Arc>, - pub broadcast_mailbox: Arc>, + inner: Arc>, + broadcast_mailbox: Arc>, } impl Receiver { pub fn new(inner: Arc>) -> Self { - let new_broadcast_mailbox = inner.new_broadcast_mailbox(); - - Receiver::with_broadcast_mailbox(inner, new_broadcast_mailbox) - } - - pub fn with_broadcast_mailbox( - inner: Arc>, - broadcast_mailbox: Arc>, - ) -> Self { inner.increment_receiver_count(); + let broadcast_mailbox = inner.new_broadcast_mailbox(); Receiver { inner, broadcast_mailbox, } } -} -impl Receiver { + pub fn receive(&self) -> ReceiveFuture { + self.inner.increment_receiver_count(); + + ReceiveFuture::new(Receiver { + inner: self.inner.clone(), + broadcast_mailbox: self.broadcast_mailbox.clone(), // It is important to clone the `Arc` here otherwise the future will read from a new broadcast mailbox. + }) + } + + pub fn pop_broadcast_message(&self) -> Option>> { + self.inner + .chan + .lock() + .unwrap() + .pop_broadcast(self.broadcast_mailbox.as_ref()) + } + pub fn sender(&self) -> Option> { Sender::try_new_strong(self.inner.clone()) } @@ -36,6 +46,18 @@ impl Receiver { pub fn weak_sender(&self) -> Sender { Sender::new_weak(self.inner.clone()) } + + pub(crate) fn try_recv(&self) -> Result, WaitingReceiver> { + self.inner.try_recv(self.broadcast_mailbox.as_ref()) + } +} + +impl Deref for Receiver { + type Target = Chan; + + fn deref(&self) -> &Self::Target { + self.inner.as_ref() + } } impl Clone for Receiver { diff --git a/src/inbox/waiting_receiver.rs b/src/inbox/waiting_receiver.rs index 38643a95..0010f502 100644 --- a/src/inbox/waiting_receiver.rs +++ b/src/inbox/waiting_receiver.rs @@ -90,13 +90,7 @@ impl WaitingReceiver { let actor_message = match ctrl_msg { CtrlMsg::NewMessage(msg) => ActorMessage::ToOneActor(msg), CtrlMsg::Shutdown => ActorMessage::Shutdown, - CtrlMsg::NewBroadcast => match receiver - .inner - .chan - .lock() - .unwrap() - .pop_broadcast(&receiver.broadcast_mailbox) - { + CtrlMsg::NewBroadcast => match receiver.pop_broadcast_message() { Some(msg) => ActorMessage::ToAllActors(msg), None => return Poll::Ready(None), }, diff --git a/src/recv_future.rs b/src/recv_future.rs index 5b54328b..cc761eb1 100644 --- a/src/recv_future.rs +++ b/src/recv_future.rs @@ -83,7 +83,7 @@ impl Future for Waiting { impl Drop for Waiting { fn drop(&mut self) { if let Some(msg) = self.waiting_receiver.cancel() { - self.channel_receiver.inner.requeue_message(msg); + self.channel_receiver.requeue_message(msg); } } } @@ -96,7 +96,7 @@ impl Future for Receiving { loop { match mem::replace(this, Receiving::Done) { - Receiving::New(rx) => match rx.inner.try_recv(rx.broadcast_mailbox.as_ref()) { + Receiving::New(rx) => match rx.try_recv() { Ok(message) => return Poll::Ready(message), Err(waiting) => { *this = Receiving::WaitingToReceive(Waiting {