Skip to content

Commit

Permalink
Move channel logic into Chan (#122)
Browse files Browse the repository at this point in the history
Move the channel logic (e.g try_send and try_recv) into Chan itself.

The goal was is follows:
    - Have Chan house the core channel implementation with descriptive function names for key operations
    - Have Sender and Receiver be thin layers on top that combine Chan with reference counting and provide a Future-based APIs for sending and receiving
  • Loading branch information
thomaseizinger authored Jul 18, 2022
1 parent e54b413 commit bc7808c
Show file tree
Hide file tree
Showing 6 changed files with 474 additions and 368 deletions.
7 changes: 1 addition & 6 deletions src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use event_listener::EventListener;
use futures_sink::Sink;
use futures_util::FutureExt;

use crate::envelope::ReturningEnvelope;
use crate::inbox::{PriorityMessageToOne, SentMessage};
use crate::refcount::{Either, RefCounter, Strong, Weak};
use crate::send_future::ResolveToHandlerReturn;
use crate::{inbox, BroadcastFuture, Error, Handler, NameableSending, SendFuture};
Expand Down Expand Up @@ -171,10 +169,7 @@ impl<A, Rc: RefCounter> Address<A, Rc> {
M: Send + 'static,
A: Handler<M>,
{
let (envelope, rx) = ReturningEnvelope::<A, M, <A as Handler<M>>::Return>::new(message);
let msg = SentMessage::ToOneActor(PriorityMessageToOne::new(0, Box::new(envelope)));
let tx = self.0.send(msg);
SendFuture::sending_named(tx, rx)
SendFuture::sending_named(message, self.0.clone())
}

/// Send a message to all actors on this address.
Expand Down
284 changes: 263 additions & 21 deletions src/inbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ pub mod tx;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, VecDeque};
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex, Weak};
use std::sync::{atomic, Arc, Mutex, Weak};
use std::{cmp, mem};

use event_listener::Event;
use event_listener::{Event, EventListener};
pub use rx::Receiver;
pub use tx::{SendFuture, Sender};

use crate::envelope::{BroadcastEnvelope, MessageEnvelope};
use crate::envelope::{BroadcastEnvelope, MessageEnvelope, Shutdown};
use crate::inbox::rx::{RxStrong, WaitingReceiver};
use crate::inbox::tx::{TxStrong, WaitingSender};
use crate::Actor;

type Spinlock<T> = spin::Mutex<T>;
pub type MessageToOneActor<A> = Box<dyn MessageEnvelope<Actor = A>>;
Expand All @@ -25,24 +26,10 @@ type BroadcastQueue<A> = Spinlock<BinaryHeap<MessageToAllActors<A>>>;
/// Create an actor mailbox, returning a sender and receiver for it. The given capacity is applied
/// severally to each send type - priority, ordered, and broadcast.
pub fn new<A>(capacity: Option<usize>) -> (Sender<A, TxStrong>, Receiver<A, RxStrong>) {
let broadcast_mailbox = Arc::new(Spinlock::new(BinaryHeap::new()));
let inner = Arc::new(Chan {
chan: Mutex::new(ChanInner {
ordered_queue: VecDeque::new(),
waiting_senders: VecDeque::new(),
waiting_receivers: VecDeque::new(),
priority_queue: BinaryHeap::new(),
broadcast_queues: vec![Arc::downgrade(&broadcast_mailbox)],
broadcast_tail: 0,
}),
capacity,
on_shutdown: Event::new(),
sender_count: AtomicUsize::new(0),
receiver_count: AtomicUsize::new(0),
});
let inner = Arc::new(Chan::new(capacity));

let tx = Sender::new(inner.clone());
let rx = Receiver::new(inner, broadcast_mailbox);
let rx = Receiver::new(inner);

(tx, rx)
}
Expand All @@ -57,9 +44,250 @@ pub struct Chan<A> {
}

impl<A> Chan<A> {
fn new(capacity: Option<usize>) -> Self {
Self {
capacity,
chan: Mutex::new(ChanInner::default()),
on_shutdown: Event::new(),
sender_count: AtomicUsize::new(0),
receiver_count: AtomicUsize::new(0),
}
}

/// Creates a new broadcast mailbox on this channel.
fn new_broadcast_mailbox(&self) -> Arc<BroadcastQueue<A>> {
let mailbox = Arc::new(Spinlock::new(BinaryHeap::new()));
self.chan
.lock()
.unwrap()
.broadcast_queues
.push(Arc::downgrade(&mailbox));

mailbox
}

fn try_send(&self, message: SentMessage<A>) -> Result<(), TrySendFail<A>> {
if !self.is_connected() {
return Err(TrySendFail::Disconnected);
}

let mut inner = self.chan.lock().unwrap();

match message {
SentMessage::ToAllActors(m) if !self.is_full(inner.broadcast_tail) => {
inner.send_broadcast(MessageToAllActors(m));
Ok(())
}
SentMessage::ToAllActors(m) => {
// on_shutdown is only notified with inner locked, and it's locked here, so no race
let waiting = WaitingSender::new(SentMessage::ToAllActors(m));
inner.waiting_senders.push_back(Arc::downgrade(&waiting));
Err(TrySendFail::Full(waiting))
}
msg => {
let res = inner.try_fulfill_receiver(msg.into());
match res {
Ok(()) => Ok(()),
Err(WakeReason::MessageToOneActor(m))
if m.priority == 0 && !self.is_full(inner.ordered_queue.len()) =>
{
inner.ordered_queue.push_back(m.val);
Ok(())
}
Err(WakeReason::MessageToOneActor(m))
if m.priority != 0 && !self.is_full(inner.priority_queue.len()) =>
{
inner.priority_queue.push(m);
Ok(())
}
Err(WakeReason::MessageToOneActor(m)) => {
let waiting = WaitingSender::new(m.into());
inner.waiting_senders.push_back(Arc::downgrade(&waiting));
Err(TrySendFail::Full(waiting))
}
_ => unreachable!(),
}
}
}
}

fn try_recv(
&self,
broadcast_mailbox: &BroadcastQueue<A>,
) -> Result<ActorMessage<A>, Arc<Spinlock<WaitingReceiver<A>>>> {
let mut broadcast = broadcast_mailbox.lock();

// Peek priorities in order to figure out which channel should be taken from
let broadcast_priority = broadcast.peek().map(|it| it.priority());

let mut inner = self.chan.lock().unwrap();

let shared_priority: Option<Priority> = inner.priority_queue.peek().map(|it| it.priority());

// Try take from ordered channel
if cmp::max(shared_priority, broadcast_priority) <= Some(Priority::Valued(0)) {
if let Some(msg) = inner.pop_ordered(self.capacity) {
return Ok(msg.into());
}
}

// Choose which priority channel to take from
match shared_priority.cmp(&broadcast_priority) {
// Shared priority is greater or equal (and it is not empty)
Ordering::Greater | Ordering::Equal if shared_priority.is_some() => {
Ok(inner.pop_priority(self.capacity).unwrap().into())
}
// Shared priority is less - take from broadcast
Ordering::Less => {
let msg = broadcast.pop().unwrap().0;
drop(broadcast);
inner.try_advance_broadcast_tail(self.capacity);

Ok(msg.into())
}
// Equal, but both are empty, so wait or exit if shutdown
_ => {
// on_shutdown is only notified with inner locked, and it's locked here, so no race
if self.sender_count.load(atomic::Ordering::SeqCst) == 0 {
return Ok(ActorMessage::Shutdown);
}

let waiting = Arc::new(Spinlock::new(WaitingReceiver::default()));
inner.waiting_receivers.push_back(Arc::downgrade(&waiting));
Err(waiting)
}
}
}

fn is_connected(&self) -> bool {
self.receiver_count.load(atomic::Ordering::SeqCst) > 0
&& self.sender_count.load(atomic::Ordering::SeqCst) > 0
}

fn len(&self) -> usize {
let inner = self.chan.lock().unwrap();
inner.broadcast_tail + inner.ordered_queue.len() + inner.priority_queue.len()
}

fn is_full(&self, len: usize) -> bool {
self.capacity.map_or(false, |cap| len >= cap)
}

/// Shutdown all [`WaitingReceiver`](crate::inbox::rx::WaitingReceiver)s in this channel.
fn shutdown_waiting_receivers(&self) {
let waiting_rx = {
let mut inner = match self.chan.lock() {
Ok(lock) => lock,
Err(_) => return, // Poisoned, ignore
};

// We don't need to notify on_shutdown here, as that is only used by senders
// Receivers will be woken with the fulfills below, or they will realise there are
// no senders when they check the tx refcount

mem::take(&mut inner.waiting_receivers)
};

for rx in waiting_rx.into_iter().flat_map(|w| w.upgrade()) {
let _ = rx.lock().fulfill(WakeReason::Shutdown);
}
}

fn shutdown_all_receivers(&self)
where
A: Actor,
{
self.chan
.lock()
.unwrap()
.send_broadcast(MessageToAllActors(Arc::new(Shutdown::new())));
}

/// Shutdown all [`WaitingSender`](crate::inbox::tx::WaitingSender)s in this channel.
fn shutdown_waiting_senders(&self) {
let waiting_tx = {
let mut inner = match self.chan.lock() {
Ok(lock) => lock,
Err(_) => return, // Poisoned, ignore
};

self.on_shutdown.notify(usize::MAX);

// Let any outstanding messages drop
inner.ordered_queue.clear();
inner.priority_queue.clear();
inner.broadcast_queues.clear();

mem::take(&mut inner.waiting_senders)
};

for tx in waiting_tx.into_iter().flat_map(|w| w.upgrade()) {
tx.lock().set_closed();
}
}

fn disconnect_listener(&self) -> Option<EventListener> {
// Listener is created before checking connectivity to avoid the following race scenario:
//
// 1. is_connected returns true
// 2. on_shutdown is notified
// 3. listener is registered
//
// The listener would never be woken in this scenario, as the notification preceded its
// creation.
let listener = self.on_shutdown.listen();

if self.is_connected() {
Some(listener)
} else {
None
}
}

fn pop_broadcast_message(
&self,
broadcast_mailbox: &BroadcastQueue<A>,
) -> Option<MessageToAllActors<A>> {
let message = broadcast_mailbox.lock().pop();

// Advance the broadcast tail if we successfully took a message.
if message.is_some() {
self.chan
.lock()
.unwrap()
.try_advance_broadcast_tail(self.capacity);
}

message
}

/// Re-queue the given message.
///
/// Normally, messages are delivered from the inbox straight to the actor. It can however happen
/// that this process gets cancelled. In that case, this function can be used to re-queue the
/// given message so it does not get lost.
///
/// Note that the ordering of messages in the queues may be slightly off with this function.
fn requeue_message(&self, msg: PriorityMessageToOne<A>) {
let mut inner = match self.chan.lock() {
Ok(lock) => lock,
Err(_) => return, // If we can't lock the inner channel, there is nothing we can do.
};

let res = inner.try_fulfill_receiver(WakeReason::MessageToOneActor(msg));
match res {
Ok(()) => (),
Err(WakeReason::MessageToOneActor(msg)) => {
if msg.priority == 0 {
// Preserve ordering as much as possible by pushing to the front
inner.ordered_queue.push_front(msg.val)
} else {
inner.priority_queue.push(msg);
}
}
Err(_) => unreachable!("Got wrong wake reason back from try_fulfill_receiver"),
}
}
}

struct ChanInner<A> {
Expand All @@ -71,6 +299,20 @@ struct ChanInner<A> {
broadcast_tail: usize,
}

// Manual impl to avoid `A: Default` bound.
impl<A> Default for ChanInner<A> {
fn default() -> Self {
Self {
ordered_queue: VecDeque::default(),
waiting_senders: VecDeque::default(),
waiting_receivers: VecDeque::default(),
priority_queue: BinaryHeap::default(),
broadcast_queues: Vec::default(),
broadcast_tail: 0,
}
}
}

impl<A> ChanInner<A> {
fn pop_priority(&mut self, capacity: Option<usize>) -> Option<MessageToOneActor<A>> {
// If len < cap after popping this message, try fulfill at most one waiting sender
Expand Down Expand Up @@ -182,7 +424,7 @@ impl<A> ChanInner<A> {
};

if let Some(tx) = self.waiting_senders.remove(pos).unwrap().upgrade() {
return Some(tx.lock().fulfill(true));
return Some(tx.lock().fulfill());
}
}
}
Expand Down Expand Up @@ -244,7 +486,7 @@ impl<A> From<Arc<dyn BroadcastEnvelope<Actor = A>>> for ActorMessage<A> {
}
}

enum WakeReason<A> {
pub enum WakeReason<A> {
MessageToOneActor(PriorityMessageToOne<A>),
// should be fetched from own receiver
MessageToAllActors,
Expand Down
Loading

0 comments on commit bc7808c

Please sign in to comment.