Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ReceiveFuture::cancel in favor of cancellation-safety docs #138

Merged
merged 6 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,10 @@ impl<A: Actor> Context<A> {
let mut next_msg = self.next_message();
match future::select(fut, &mut next_msg).await {
Either::Left((future_res, _)) => {
if let Some(msg) = next_msg.cancel() {
self.tick(msg, actor).await;
// Check if `ReceiveFuture` was in the process of handling a message.
// If yes, dispatch it, otherwise continue.
if let Some(message) = next_msg.now_or_never() {
self.tick(message, actor).await;
}

return Either::Left(future_res);
Expand Down Expand Up @@ -315,23 +317,18 @@ impl<A> Clone for Context<A> {
pub struct Message<A>(pub(crate) ActorMessage<A>);

/// A future which will resolve to the next message to be handled by the actor.
///
/// # Cancellation safety
///
/// This future is cancellation-safe in that no messages will ever be lost, even if this future is
/// dropped half-way through. However, reinserting the message into the mailbox may mess with the
/// ordering of messages and they may be handled by the actor out of order.
///
/// If the order in which your actors process messages is not important to you, you can consider this
/// future to be fully cancellation-safe.
#[must_use = "Futures do nothing unless polled"]
pub struct ReceiveFuture<A>(InboxReceiveFuture<A, RxStrong>);

impl<A> ReceiveFuture<A> {
/// Cancel the receiving future, returning a message if it had been fulfilled with one, but had
/// not yet been polled after wakeup. Future calls to `Future::poll` will return `Poll::Pending`,
/// and `FusedFuture::is_terminated` will return `true`.
///
/// This is important to do over `Drop`, as with `Drop` messages may be sent back into the
/// channel and could be observed as received out of order, if multiple receives are concurrent
/// on one thread.
#[must_use = "If dropped, messages could be lost"]
pub fn cancel(&mut self) -> Option<Message<A>> {
self.0.cancel().map(Message)
}
}

impl<A> Future for ReceiveFuture<A> {
type Output = Message<A>;

Expand Down
18 changes: 1 addition & 17 deletions src/inbox/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,22 +228,6 @@ impl<A, Rc: RxRefCounter> Future for ReceiveFuture<A, Rc> {
}
}

impl<A, Rc: RxRefCounter> ReceiveFuture<A, Rc> {
/// See docs on [`crate::context::ReceiveFuture::cancel`] for more
#[must_use = "If dropped, messages could be lost"]
pub fn cancel(&mut self) -> Option<ActorMessage<A>> {
if let ReceiveFutureInner::Waiting { waiting, .. } =
mem::replace(&mut self.0, ReceiveFutureInner::Complete)
{
if let Some(WakeReason::MessageToOneActor(msg)) = waiting.lock().cancel() {
return Some(msg.into());
}
}

None
}
}

impl<A, Rc: RxRefCounter> Drop for ReceiveFuture<A, Rc> {
fn drop(&mut self) {
if let ReceiveFutureInner::Waiting { waiting, rx } =
Expand Down Expand Up @@ -313,7 +297,7 @@ impl<A> WaitingReceiver<A> {
Ok(())
}

/// Signify that this waiting receiver was cancelled through [`ReceiveFuture::cancel`]
/// Cancel this [`WaitingReceiver`] returning its current, internal state.
fn cancel(&mut self) -> Option<WakeReason<A>> {
mem::replace(&mut self.wake_reason, Some(WakeReason::Cancelled))
}
Expand Down
10 changes: 10 additions & 0 deletions tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1082,3 +1082,13 @@ fn no_sender_returns_disconnected() {
drop(addr);
assert!(!ctx.weak_address().is_connected());
}

#[tokio::test]
async fn receive_future_can_dispatch_in_one_poll() {
let (addr, ctx) = Context::<Greeter>::new(None);

let _ = addr.send(Hello("world")).split_receiver().await;
let receive_future = ctx.next_message();

assert!(receive_future.now_or_never().is_some())
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the race here would be:

  • now or never returns None
  • message is sent
  • drop is called
  • message is found and sent, being reordered

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, this is better in most but not all cases than just dropping

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't fully understand.

I think this test proves that, once you have a ReceiveFuture, it can complete in a single poll, even if it has never been polled yet. Isn't this enough?

I guess we are not testing the case in which it is Waiting.

Copy link
Collaborator Author

@thomaseizinger thomaseizinger Jul 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of other test I just added in 753ad12?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this test proves that, once you have a ReceiveFuture, it can complete in a single poll, even if it has never been polled yet. Isn't this enough?

I agree, but this is not the race which I was describing. I might be able to write a test tomorrow showing exactly what I mean, though not with now_or_never, but with manual polling emulating it.

I think the other test is also valuable, but doesn't quite test what I meant.

Copy link
Owner

@Restioson Restioson Jul 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it comes down to what kind of ordering we would like to guarantee when multiple actors are involved. now_or_never has equivalent ordering with Drop and even cancel if there is only one receiving actor.

// I can post the rest of the test if needed
let (addr, mut ctx) = Context::new(None);

let mut a = Cow { name: "Alfred" }; // arb actor type

let mut rx_a = ctx.next_message();
let mut fut_ctx = std::task::Context::from_waker(noop_waker_ref());

// Pretend this is the poll in now_or_never...
assert!(rx_a.poll_unpin(&mut fut_ctx).is_pending());

// This will fulfill the waiting rx in rx_a
assert!(addr.send(Hello("first message")).split_receiver().now_or_never().is_some());

// This will go to the queue
assert!(addr.send(Hello("second message")).split_receiver().now_or_never().is_some());

// Pretend this receive is from another actor...
ctx.tick(ctx.next_message().await, &mut a).await;

// Pretend this is the drop in now_or_never
drop(rx_a);

// This receive is also from the other actor...
ctx.tick(ctx.next_message().await, &mut a).await;

In this case, the second actor has now handled the message out of order.

I don't actually think this is much of a problem, on second thought, but we should mention how ordering interacts with multiple actors, just in case, although I do not expect multiple actors on the same address to be relying on external message ordering.

Copy link
Collaborator Author

@thomaseizinger thomaseizinger Jul 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. What is missing for me in the above case is to do something with rx_a after we have initially polled it. I think it is okay for things to behave weirdly if you have half-polled futures around that are not interacted with.

  1. If you Drop the rx_a future before the first send, it doesn't have a message and thus no re-ordering takes place
  2. If you Drop it afterwards, the message gets re-inserted and re-ordering might take place. This is documented with this PR.
  3. If you poll it again after the first send (with now_or_never), the message will be delivered to you and you can pass it to an actor.

(3) is what we are doing in the select function now which I think is functionally equivalent to what we had with cancel.

I don't actually think this is much of a problem, on second thought, but we should mention how ordering interacts with multiple actors, just in case, although I do not expect multiple actors on the same address to be relying on external message ordering.

I don't think we want to make any guarantees in regards to message ordering with our work-stealing scheduling. That would be kind of bizarre.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is okay for things to behave weirdly if you have half-polled futures around that are not interacted with.

Fair enough. There was a similar discussion in flume at some point, if I remember correctly, and we came to a similar decision.

If you poll it again after the first send (with now_or_never), the message will be delivered to you and you can pass it to an actor.

Technically, the message will be delivered to you whether or not you poll it more than once. As soon as you poll it once it will be delivered to you if the sender is given an opportunity.

I don't think we want to make any guarantees in regards to message ordering with our work-stealing scheduling. That would be kind of bizarre.

Agreed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I think this is ready to merge then, right?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, yes. Maybe as part of #121 we can mention the ordering

}