Skip to content

Commit

Permalink
Have Receiver have a public API again
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Aug 2, 2022
1 parent aad40ff commit 3dd7075
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 30 deletions.
8 changes: 1 addition & 7 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,7 @@ impl<A: Actor> Context<A> {

/// Get for the next message from the actor's mailbox.
pub fn next_message(&self) -> ReceiveFuture<A> {
// It is important to clone the `Arc` here otherwise the future will read from a new broadcast mailbox.
let receiver = inbox::Receiver::with_broadcast_mailbox(
self.mailbox.inner.clone(),
self.mailbox.broadcast_mailbox.clone(),
);

ReceiveFuture::new(receiver)
self.mailbox.receive()
}

/// Handle one message and return whether to exit from the manage loop or not.
Expand Down
50 changes: 36 additions & 14 deletions src/inbox/rx.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,63 @@
use std::ops::Deref;
use std::sync::Arc;

use crate::envelope::BroadcastEnvelope;
use crate::inbox::tx::{TxStrong, TxWeak};
use crate::inbox::{BroadcastQueue, Chan, Sender};
use crate::inbox::{ActorMessage, BroadcastQueue, Chan, Sender, WaitingReceiver};
use crate::recv_future::ReceiveFuture;

pub struct Receiver<A> {
pub inner: Arc<Chan<A>>,
pub broadcast_mailbox: Arc<BroadcastQueue<A>>,
inner: Arc<Chan<A>>,
broadcast_mailbox: Arc<BroadcastQueue<A>>,
}

impl<A> Receiver<A> {
pub fn new(inner: Arc<Chan<A>>) -> Self {
let new_broadcast_mailbox = inner.new_broadcast_mailbox();

Receiver::with_broadcast_mailbox(inner, new_broadcast_mailbox)
}

pub fn with_broadcast_mailbox(
inner: Arc<Chan<A>>,
broadcast_mailbox: Arc<BroadcastQueue<A>>,
) -> Self {
inner.increment_receiver_count();
let broadcast_mailbox = inner.new_broadcast_mailbox();

Receiver {
inner,
broadcast_mailbox,
}
}
}

impl<A> Receiver<A> {
pub fn receive(&self) -> ReceiveFuture<A> {
self.inner.increment_receiver_count();

ReceiveFuture::new(Receiver {
inner: self.inner.clone(),
broadcast_mailbox: self.broadcast_mailbox.clone(), // It is important to clone the `Arc` here otherwise the future will read from a new broadcast mailbox.
})
}

pub fn pop_broadcast_message(&self) -> Option<Arc<dyn BroadcastEnvelope<Actor = A>>> {
self.inner
.chan
.lock()
.unwrap()
.pop_broadcast(self.broadcast_mailbox.as_ref())
}

pub fn sender(&self) -> Option<Sender<A, TxStrong>> {
Sender::try_new_strong(self.inner.clone())
}

pub fn weak_sender(&self) -> Sender<A, TxWeak> {
Sender::new_weak(self.inner.clone())
}

pub(crate) fn try_recv(&self) -> Result<ActorMessage<A>, WaitingReceiver<A>> {
self.inner.try_recv(self.broadcast_mailbox.as_ref())
}
}

impl<A> Deref for Receiver<A> {
type Target = Chan<A>;

fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}

impl<A> Clone for Receiver<A> {
Expand Down
8 changes: 1 addition & 7 deletions src/inbox/waiting_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,7 @@ impl<A> WaitingReceiver<A> {
let actor_message = match ctrl_msg {
CtrlMsg::NewMessage(msg) => ActorMessage::ToOneActor(msg),
CtrlMsg::Shutdown => ActorMessage::Shutdown,
CtrlMsg::NewBroadcast => match receiver
.inner
.chan
.lock()
.unwrap()
.pop_broadcast(&receiver.broadcast_mailbox)
{
CtrlMsg::NewBroadcast => match receiver.pop_broadcast_message() {
Some(msg) => ActorMessage::ToAllActors(msg),
None => return Poll::Ready(None),
},
Expand Down
4 changes: 2 additions & 2 deletions src/recv_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl<A> Future for Waiting<A> {
impl<A> Drop for Waiting<A> {
fn drop(&mut self) {
if let Some(msg) = self.waiting_receiver.cancel() {
self.channel_receiver.inner.requeue_message(msg);
self.channel_receiver.requeue_message(msg);
}
}
}
Expand All @@ -96,7 +96,7 @@ impl<A> Future for Receiving<A> {

loop {
match mem::replace(this, Receiving::Done) {
Receiving::New(rx) => match rx.inner.try_recv(rx.broadcast_mailbox.as_ref()) {
Receiving::New(rx) => match rx.try_recv() {
Ok(message) => return Poll::Ready(message),
Err(waiting) => {
*this = Receiving::WaitingToReceive(Waiting {
Expand Down

0 comments on commit 3dd7075

Please sign in to comment.