Skip to content

Commit

Permalink
We should only notify the stop listener AFTER post_stop has execute…
Browse files Browse the repository at this point in the history
…d, which is when the state gets set to `Stopped`

Closes Add a pre_stop fn or some way to wait until post_stop has finished #254
  • Loading branch information
slawlor committed Sep 13, 2024
1 parent 67d657e commit 9c6aa82
Show file tree
Hide file tree
Showing 17 changed files with 111 additions and 50 deletions.
3 changes: 1 addition & 2 deletions ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.10.3"
version = "0.10.4"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand All @@ -19,7 +19,6 @@ cluster = []
tokio_runtime = ["tokio/time", "tokio/rt", "tokio/macros", "tokio/tracing"]
blanket_serde = ["serde", "pot", "cluster"]

# default = ["async-std"]
default = ["tokio_runtime", "async-trait"]

[dependencies]
Expand Down
4 changes: 4 additions & 0 deletions ractor/src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,11 @@ impl ActorCell {
// Leave all + stop monitoring pg groups (if any)
crate::pg::demonitor_all(self.get_id());
crate::pg::leave_all(self.get_id());
}

// Fix for #254. We should only notify the stop listener AFTER post_stop
// has executed, which is when the state gets set to `Stopped`.
if status == ActorStatus::Stopped {
// notify whoever might be waiting on the stop signal
self.inner.notify_stop_listener();
}
Expand Down
20 changes: 10 additions & 10 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
//! To link actors together in the supervision tree, there are 2 choices.
//!
//! 1. [Actor::spawn_linked] which requires supplying the supervisor to the actor upon spawning a child.
//! This call will link the supervision tree as early as possible in the lifecycle of the actors,
//! such that a failure or panic in `post_start` will be captured and notify the supervisor
//! This call will link the supervision tree as early as possible in the lifecycle of the actors,
//! such that a failure or panic in `post_start` will be captured and notify the supervisor
//! 2. `ActorCell::link` will link actors together after-the-fact, once already spawned. This is helpful
//! for actors which are originally created independently but have some latent relationship to each
//! other. However due to startup routines and asynchronous processing, it's unlikely that failures
//! in `post_start` and any other asynchronous handling will be captured in the supervision tree.
//! for actors which are originally created independently but have some latent relationship to each
//! other. However due to startup routines and asynchronous processing, it's unlikely that failures
//! in `post_start` and any other asynchronous handling will be captured in the supervision tree.
//!
//! ## Handling panics
//!
Expand Down Expand Up @@ -485,7 +485,7 @@ where
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The [Actor] defining the logic for this actor
/// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
/// initial state creation
/// initial state creation
///
/// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
/// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
Expand All @@ -509,7 +509,7 @@ where
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The [Actor] defining the logic for this actor
/// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
/// initial state creation
/// initial state creation
/// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
///
/// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
Expand Down Expand Up @@ -540,7 +540,7 @@ where
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The [Actor] defining the logic for this actor
/// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
/// initial state creation
/// initial state creation
///
/// Returns a [Ok((ActorRef, JoinHandle<Result<JoinHandle<()>, SpawnErr>>))] upon successful creation of the
/// message queues, so you can begin sending messages. However the associated [JoinHandle] contains the inner
Expand Down Expand Up @@ -585,7 +585,7 @@ where
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The [Actor] defining the logic for this actor
/// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
/// initial state creation
/// initial state creation
/// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
///
/// Returns a [Ok((ActorRef, JoinHandle<Result<JoinHandle<()>, SpawnErr>>))] upon successful creation of the
Expand Down Expand Up @@ -625,7 +625,7 @@ where
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler`: The [Actor] defining the logic for this actor
/// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
/// initial state creation
/// initial state creation
/// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
///
/// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
Expand Down
60 changes: 59 additions & 1 deletion ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use std::sync::{
Arc,
};

use crate::{common_test::periodic_check, concurrency::Duration, MessagingErr, RactorErr};
use crate::{
common_test::periodic_check,
concurrency::{sleep, Duration},
MessagingErr, RactorErr,
};

use crate::{
Actor, ActorCell, ActorProcessingErr, ActorRef, ActorStatus, SpawnErr, SupervisionEvent,
Expand Down Expand Up @@ -1000,3 +1004,57 @@ async fn actor_failing_in_spawn_err_doesnt_poison_registries() {
a.stop(None);
h.await.unwrap();
}

/// https://github.com/slawlor/ractor/issues/254
#[crate::concurrency::test]
#[tracing_test::traced_test]
async fn actor_post_stop_executed_before_stop_and_wait_returns() {
struct TestActor {
signal: Arc<AtomicU8>,
}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = EmptyMessage;
type Arguments = ();
type State = ();

async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}

async fn post_stop(
&self,
_: ActorRef<Self::Msg>,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
sleep(Duration::from_millis(1000)).await;
let _ = self.signal.store(1, Ordering::SeqCst);
Ok(())
}
}

let signal = Arc::new(AtomicU8::new(0));
let (actor, handle) = Actor::spawn(
None,
TestActor {
signal: signal.clone(),
},
(),
)
.await
.expect("Failed to spawn test actor");

actor
.stop_and_wait(None, None)
.await
.expect("Failed to stop and wait");

assert_eq!(1, signal.load(Ordering::SeqCst));

handle.await.unwrap();
}
10 changes: 5 additions & 5 deletions ractor/src/factory/factoryimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ where
/// controlled by `discard_mode`).
///
/// * For factories using [routing::QueuerRouting], [routing::StickyQueuerRouting] routing, these
/// are applied to the factory's internal queue.
/// are applied to the factory's internal queue.
/// * For all other routing protocols, this applies to the worker's message queue
///
/// Default is [DiscardSettings::None]
Expand Down Expand Up @@ -192,11 +192,11 @@ where
/// Construct a new [FactoryArguments] with the required arguments
///
/// * `worker_builder`: The implementation of the [WorkerBuilder] trait which is
/// used to construct worker instances as needed
/// used to construct worker instances as needed
/// * `router`: The message routing implementation the factory should use. Implements
/// the [Router] trait.
/// the [Router] trait.
/// * `queue`: The message queueing implementation the factory should use. Implements
/// the [Queue] trait.
/// the [Queue] trait.
pub fn new<TBuilder: WorkerBuilder<TWorker, TWorkerStart> + 'static>(
worker_builder: TBuilder,
router: TRouter,
Expand Down Expand Up @@ -276,7 +276,7 @@ where
/// (which is controlled by `discard_mode`).
///
/// * For factories using [routing::QueuerRouting], [routing::StickyQueuerRouting] routing, these
/// are applied to the factory's internal queue.
/// are applied to the factory's internal queue.
/// * For all other routing protocols, this applies to the worker's message queue
///
/// Default is [DiscardSettings::None]
Expand Down
14 changes: 7 additions & 7 deletions ractor/src/factory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
//! on the intended workload. Some general guidance:
//!
//! 1. If you need to process a sequence of operations on a given key (i.e. the Job is a user, and
//! there's a sequential list of updates to that user). You then want the job to land on the same
//! worker and should select [routing::KeyPersistentRouting] or [routing::StickyQueuerRouting].
//! there's a sequential list of updates to that user). You then want the job to land on the same
//! worker and should select [routing::KeyPersistentRouting] or [routing::StickyQueuerRouting].
//! 2. If you don't need a sequence of operations then [routing::QueuerRouting] is likely a good choice.
//! 3. If your workers are making remote calls to other services/actors you probably want [routing::QueuerRouting]
//! or [routing::StickyQueuerRouting] to prevent head-of-the-line contention. Otherwise [routing::KeyPersistentRouting]
//! is sufficient.
//! or [routing::StickyQueuerRouting] to prevent head-of-the-line contention. Otherwise [routing::KeyPersistentRouting]
//! is sufficient.
//! 4. For some custom defined routing, you can define your own [routing::CustomHashFunction] which will be
//! used in conjunction with [routing::CustomRouting] to take the incoming job key and
//! the space which should be hashed to (i.e. the number of workers).
//! used in conjunction with [routing::CustomRouting] to take the incoming job key and
//! the space which should be hashed to (i.e. the number of workers).
//! 5. If you just want load balancing there's also [routing::RoundRobinRouting] for general 1-off
//! dispatching of jobs
//! dispatching of jobs
//!
//! ## Factory queueing
//!
Expand Down
2 changes: 1 addition & 1 deletion ractor/src/factory/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ where
/// Remove expired items from the queue
///
/// * `discard_handler` - The handler to call for each discarded job. Will be called
/// with [DiscardReason::Loadshed].
/// with [DiscardReason::Loadshed].
///
/// Returns the number of elements removed from the queue
fn remove_expired_items(
Expand Down
10 changes: 5 additions & 5 deletions ractor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@
//! will be supported by `ractor`. There are 4 concurrent message types, which are listened to in priority. They are
//!
//! 1. Signals: Signals are the highest-priority of all and will interrupt the actor wherever processing currently is (this includes terminating async work). There
//! is only 1 signal today, which is `Signal::Kill`, and it immediately terminates all work. This includes message processing or supervision event processing.
//! is only 1 signal today, which is `Signal::Kill`, and it immediately terminates all work. This includes message processing or supervision event processing.
//! 2. Stop: There is also a pre-defined stop signal. You can give a "stop reason" if you want, but it's optional. Stop is a graceful exit, meaning currently executing async
//! work will complete, and on the next message processing iteration Stop will take priority over future supervision events or regular messages. It will **not** terminate
//! currently executing work, regardless of the provided reason.
//! work will complete, and on the next message processing iteration Stop will take priority over future supervision events or regular messages. It will **not** terminate
//! currently executing work, regardless of the provided reason.
//! 3. SupervisionEvent: Supervision events are messages from child actors to their supervisors in the event of their startup, death, and/or unhandled panic. Supervision events
//! are how an actor's supervisor(s) are notified of events of their children and can handle lifetime events for them.
//! are how an actor's supervisor(s) are notified of events of their children and can handle lifetime events for them.
//! 4. Messages: Regular, user-defined, messages are the last channel of communication to actors. They are the lowest priority of the 4 message types and denote general actor work. The first
//! 3 messages types (signals, stop, supervision) are generally quiet unless it's a lifecycle event for the actor, but this channel is the "work" channel doing what your actor wants to do!
//! 3 messages types (signals, stop, supervision) are generally quiet unless it's a lifecycle event for the actor, but this channel is the "work" channel doing what your actor wants to do!
#![warn(unused_imports)]
// #![warn(unsafe_code)]
Expand Down
8 changes: 4 additions & 4 deletions ractor/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ macro_rules! cast {
///
/// * `$actor` - The actor to call
/// * `$msg` - The message builder which takes in a [crate::port::RpcReplyPort] and emits a message which
/// the actor supports
/// the actor supports
/// * `$args` - (optional) Variable length arguments which will PRECEDE the reply channel when
/// constructing the message payload
/// constructing the message payload
///
/// Returns [Ok(_)] with the result on successful RPC or [Err(crate::RactorErr)] on failure
/// Example usage (without the `cluster` feature)
Expand Down Expand Up @@ -103,7 +103,7 @@ macro_rules! call {
/// * `$msg` - The message builder variant
/// * `$timeout_ms` - the timeout in milliseconds for the remote procedure call
/// * `$args` - (optional) Variable length arguments which will PRECEDE the reply channel when
/// constructing the message payload
/// constructing the message payload
///
/// Returns [Ok(_)] with the result on successful RPC or [Err(crate::RactorErr)] on failure
///
Expand Down Expand Up @@ -186,7 +186,7 @@ macro_rules! call_t {
///
/// * `$actor` - The actors to call
/// * `$msg` - The message builder, which takes in a [crate::port::RpcReplyPort] and emits a message which
/// the actor supports.
/// the actor supports.
/// * `$forward` - The [crate::ActorRef] to forward the call to
/// * `$forward_mapping` - The message transformer from the RPC result to the forwarding actor's message format
/// * `$timeout` - The [crate::concurrency::Duration] to allow the call to complete before timing out.
Expand Down
4 changes: 2 additions & 2 deletions ractor/src/port/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ where
///
/// * `receiver` - The reference to the actor which will receive forwarded messages
/// * `converter` - The converter which will convert the output message type to the
/// receiver's input type and return [Some(_)] if the message should be forwarded, [None]
/// if the message should be skipped.
/// receiver's input type and return [Some(_)] if the message should be forwarded, [None]
/// if the message should be skipped.
pub fn subscribe<TReceiverMsg, F>(&self, receiver: ActorRef<TReceiverMsg>, converter: F)
where
F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
Expand Down
8 changes: 4 additions & 4 deletions ractor/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ where
/// * `actor` - A reference to the [ActorCell] to communicate with
/// * `msg_builder` - The [FnOnce] to construct the message
/// * `timeout_option` - An optional [Duration] which represents the amount of
/// time until the operation times out
/// time until the operation times out
///
/// Returns [Ok(CallResult)] upon successful initial sending with the reply from
/// the [crate::Actor], [Err(MessagingErr)] if the initial send operation failed
Expand Down Expand Up @@ -158,7 +158,7 @@ where
/// * `actors` - A reference to the group of [ActorCell]s to communicate with
/// * `msg_builder` - The [FnOnce] to construct the message
/// * `timeout_option` - An optional [Duration] which represents the amount of
/// time until the operation times out
/// time until the operation times out
///
/// Returns [Ok(`Vec<CallResult<TReply>>>`)] upon successful initial sending with the reply from
/// the [crate::Actor]s, [Err(MessagingErr)] if the initial send operation failed
Expand Down Expand Up @@ -233,9 +233,9 @@ where
/// * `msg_builder` - The [FnOnce] to construct the message
/// * `response_forward` - The [ActorCell] to forward the message to
/// * `forward_mapping` - The [FnOnce] which maps the response from the `actor` [ActorCell]'s reply message
/// type to the `response_forward` [ActorCell]'s message type
/// type to the `response_forward` [ActorCell]'s message type
/// * `timeout_option` - An optional [Duration] which represents the amount of
/// time until the operation times out
/// time until the operation times out
///
/// Returns: A [JoinHandle<CallResult<()>>] which can be awaited to see if the
/// forward was successful or ignored
Expand Down
4 changes: 2 additions & 2 deletions ractor/src/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ mod tests;
/// * `period` - The [Duration] representing the period for the send interval
/// * `actor` - The [ActorCell] representing the [crate::Actor] to communicate with
/// * `msg` - The [Fn] message builder which is called to generate a message for each send
/// operation.
/// operation.
///
/// Returns: The [JoinHandle] which represents the backgrounded work (can be ignored to
/// "fire and forget")
Expand Down Expand Up @@ -132,7 +132,7 @@ where
/// * `period` - The [Duration] representing the time to delay before sending
/// * `actor` - The [ActorCell] representing the [crate::Actor] to communicate with
/// * `msg` - The [FnOnce] message builder which is called to generate a message for the send
/// operation
/// operation
///
/// Returns: The [JoinHandle<Result<(), MessagingErr>>] which represents the backgrounded work.
/// Awaiting the handle will yield the result of the send operation. Can be safely ignored to
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster"
version = "0.10.3"
version = "0.10.4"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
Expand Down
6 changes: 3 additions & 3 deletions ractor_cluster/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,19 +259,19 @@ pub trait NodeEventSubscription: Send + 'static {
/// A node session has started up
///
/// * `ses`: The [NodeServerSessionInformation] representing the current state
/// of the node session
/// of the node session
fn node_session_opened(&self, ses: NodeServerSessionInformation);

/// A node session has shutdown
///
/// * `ses`: The [NodeServerSessionInformation] representing the current state
/// of the node session
/// of the node session
fn node_session_disconnected(&self, ses: NodeServerSessionInformation);

/// A node session authenticated
///
/// * `ses`: The [NodeServerSessionInformation] representing the current state
/// of the node session
/// of the node session
fn node_session_authenicated(&self, ses: NodeServerSessionInformation);
}

Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster/src/node/node_session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl AuthenticationState {
/// 1. The state of the authentication handshake
/// 2. Control messages for actor synchronization + group membership changes
/// 3. `RemoteActor`s wishing to send messages to their remote counterparts on the
/// remote system (and receive replies)
/// remote system (and receive replies)
///
/// A [NodeSession] can either be a client or server session, depending on the connection sequence.
/// If it was an incoming request to the [crate::NodeServer] then it's a "server" session, as
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster/src/remote_actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl RemoteActor {
/// * `pid`: The actor's local id on the remote system
/// * `node_id` The id of the [super::node::NodeSession]. Alongside `pid` this makes for a unique actor identifier
/// * `supervisor`: The [super::node::NodeSession]'s [ActorCell] handle which will be linked in
/// the supervision tree
/// the supervision tree
///
/// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
/// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
Expand Down
Loading

0 comments on commit 9c6aa82

Please sign in to comment.