Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prep for 0.10.0 release #238

Merged
merged 2 commits into from
May 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Install `ractor` by adding the following to your Cargo.toml dependencies.

```toml
[dependencies]
ractor = "0.9"
ractor = "0.10"
```

The minimum supported Rust version (MSRV) of `ractor` is `1.64`. However to utilize the native `async fn` support in traits and not rely on the `async-trait` crate's desugaring functionliaty, you need to be on Rust version `>= 1.75`. The stabilization of `async fn` in traits [was recently added](https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html).
Expand Down
4 changes: 2 additions & 2 deletions ractor/examples/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl Actor for MidLevelActor {
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorPanicked(dead_actor, panic_msg)
SupervisionEvent::ActorFailed(dead_actor, panic_msg)
if dead_actor.get_id() == state.leaf_actor.get_id() =>
{
tracing::info!("MidLevelActor: {dead_actor:?} panicked with '{panic_msg}'");
Expand Down Expand Up @@ -295,7 +295,7 @@ impl Actor for RootActor {
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorPanicked(dead_actor, panic_msg)
SupervisionEvent::ActorFailed(dead_actor, panic_msg)
if dead_actor.get_id() == state.mid_level_actor.get_id() =>
{
tracing::info!("RootActor: {dead_actor:?} panicked with '{panic_msg}'");
Expand Down
4 changes: 2 additions & 2 deletions ractor/src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,14 @@ impl ActorCell {
TActor: Actor,
{
if id.is_local() {
return Err(SpawnErr::StartupPanic(From::from("Cannot create a new remote actor handler without the actor id being marked as a remote actor!")));
return Err(SpawnErr::StartupFailed(From::from("Cannot create a new remote actor handler without the actor id being marked as a remote actor!")));
}

let (props, rx1, rx2, rx3, rx4) = ActorProperties::new_remote::<TActor>(name, id);
let cell = Self {
inner: Arc::new(props),
};
// TODO: remote actors don't appear in the name registry
// NOTE: remote actors don't appear in the name registry
// if let Some(r_name) = name {
// crate::registry::register(r_name, cell.clone())?;
// }
Expand Down
12 changes: 6 additions & 6 deletions ractor/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ pub enum SupervisionEvent {
Option<BoxedState>,
Option<String>,
),
/// An actor panicked
ActorPanicked(super::actor_cell::ActorCell, ActorProcessingErr),
/// An actor failed (due to panic or error case)
ActorFailed(super::actor_cell::ActorCell, ActorProcessingErr),

/// A subscribed process group changed
ProcessGroupChanged(crate::pg::GroupChangeMessage),
Expand All @@ -110,8 +110,8 @@ impl SupervisionEvent {
pub(crate) fn clone_no_data(&self) -> Self {
match self {
Self::ActorStarted(who) => Self::ActorStarted(who.clone()),
Self::ActorPanicked(who, what) => {
Self::ActorPanicked(who.clone(), From::from(format!("{what}")))
Self::ActorFailed(who, what) => {
Self::ActorFailed(who.clone(), From::from(format!("{what}")))
}
Self::ProcessGroupChanged(what) => Self::ProcessGroupChanged(what.clone()),
Self::ActorTerminated(who, _state, msg) => {
Expand All @@ -131,7 +131,7 @@ impl SupervisionEvent {
pub fn actor_cell(&self) -> Option<&super::actor_cell::ActorCell> {
match self {
Self::ActorStarted(who)
| Self::ActorPanicked(who, _)
| Self::ActorFailed(who, _)
| Self::ActorTerminated(who, _, _) => Some(who),
_ => None,
}
Expand Down Expand Up @@ -166,7 +166,7 @@ impl std::fmt::Display for SupervisionEvent {
write!(f, "Stopped actor {actor:?}")
}
}
SupervisionEvent::ActorPanicked(actor, panic_msg) => {
SupervisionEvent::ActorFailed(actor, panic_msg) => {
write!(f, "Actor panicked {actor:?} - {panic_msg}")
}
SupervisionEvent::ProcessGroupChanged(change) => {
Expand Down
24 changes: 12 additions & 12 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ pub trait Actor: Sized + Sync + Send + 'static {
async move {
match message {
SupervisionEvent::ActorTerminated(who, _, _)
| SupervisionEvent::ActorPanicked(who, _) => {
| SupervisionEvent::ActorFailed(who, _) => {
myself.stop(None);
}
_ => {}
Expand All @@ -340,7 +340,7 @@ pub trait Actor: Sized + Sync + Send + 'static {
) -> Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorTerminated(who, _, _)
| SupervisionEvent::ActorPanicked(who, _) => {
| SupervisionEvent::ActorFailed(who, _) => {
myself.stop(None);
}
_ => {}
Expand Down Expand Up @@ -620,7 +620,7 @@ where
supervisor: ActorCell,
) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
if id.is_local() {
Err(SpawnErr::StartupPanic(From::from(
Err(SpawnErr::StartupFailed(From::from(
"Cannot spawn a remote actor when the identifier is not remote!",
)))
} else {
Expand Down Expand Up @@ -697,7 +697,7 @@ where
// Perform the pre-start routine, crashing immediately if we fail to start
let mut state = Self::do_pre_start(actor_ref.clone(), &handler, startup_args)
.await?
.map_err(SpawnErr::StartupPanic)?;
.map_err(SpawnErr::StartupFailed)?;

// setup supervision
if let Some(sup) = &supervisor {
Expand All @@ -724,7 +724,7 @@ where
None,
Some("killed".to_string()),
),
ActorErr::Panic(msg) => SupervisionEvent::ActorPanicked(myself.get_cell(), msg),
ActorErr::Failed(msg) => SupervisionEvent::ActorFailed(myself.get_cell(), msg),
},
};

Expand Down Expand Up @@ -761,7 +761,7 @@ where
// perform the post-start, with supervision enabled
Self::do_post_start(myself.clone(), handler, state)
.await?
.map_err(ActorErr::Panic)?;
.map_err(ActorErr::Failed)?;

myself.set_status(ActorStatus::Running);
myself.notify_supervisor_and_monitors(SupervisionEvent::ActorStarted(myself.get_cell()));
Expand All @@ -778,7 +778,7 @@ where
was_killed,
} = Self::process_message(myself.clone(), state, handler, &mut ports)
.await
.map_err(ActorErr::Panic)?;
.map_err(ActorErr::Failed)?;
// processing loop exit
if should_exit {
return Ok((state, exit_reason, was_killed));
Expand All @@ -788,7 +788,7 @@ where

// capture any panics in this future and convert to an ActorErr
let loop_done = futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
.map_err(|err| ActorErr::Panic(get_panic_string(err)))
.map_err(|err| ActorErr::Failed(get_panic_string(err)))
.await;

// set status to stopping
Expand All @@ -800,7 +800,7 @@ where
if !was_killed {
Self::do_post_stop(myself_clone, handler, exit_state)
.await?
.map_err(ActorErr::Panic)?;
.map_err(ActorErr::Failed)?;
}

Ok(exit_reason)
Expand Down Expand Up @@ -955,7 +955,7 @@ where
let future = handler.pre_start(myself, arguments);
futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
.await
.map_err(|err| SpawnErr::StartupPanic(get_panic_string(err)))
.map_err(|err| SpawnErr::StartupFailed(get_panic_string(err)))
}

async fn do_post_start(
Expand All @@ -966,7 +966,7 @@ where
let future = handler.post_start(myself, state);
futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
.await
.map_err(|err| ActorErr::Panic(get_panic_string(err)))
.map_err(|err| ActorErr::Failed(get_panic_string(err)))
}

async fn do_post_stop(
Expand All @@ -977,6 +977,6 @@ where
let future = handler.post_stop(myself, state);
futures::FutureExt::catch_unwind(AssertUnwindSafe(future))
.await
.map_err(|err| ActorErr::Panic(get_panic_string(err)))
.map_err(|err| ActorErr::Failed(get_panic_string(err)))
}
}
6 changes: 3 additions & 3 deletions ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn test_panic_on_start_captured() {
}

let actor_output = Actor::spawn(None, TestActor, ()).await;
assert!(matches!(actor_output, Err(SpawnErr::StartupPanic(_))));
assert!(matches!(actor_output, Err(SpawnErr::StartupFailed(_))));
}

#[crate::concurrency::test]
Expand All @@ -67,7 +67,7 @@ async fn test_error_on_start_captured() {
}

let actor_output = Actor::spawn(None, TestActor, ()).await;
assert!(matches!(actor_output, Err(SpawnErr::StartupPanic(_))));
assert!(matches!(actor_output, Err(SpawnErr::StartupFailed(_))));
}

#[crate::concurrency::test]
Expand Down Expand Up @@ -935,7 +935,7 @@ fn returns_actor_references() {
(true, SupervisionEvent::ActorStarted(dummy_actor_cell())),
(
true,
SupervisionEvent::ActorPanicked(dummy_actor_cell(), "Bang!".into()),
SupervisionEvent::ActorFailed(dummy_actor_cell(), "Bang!".into()),
),
(
true,
Expand Down
20 changes: 10 additions & 10 deletions ractor/src/actor/tests/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async fn test_supervision_panic_in_post_startup() {
println!("Supervisor event received {message:?}");

// check that the panic was captured
if let SupervisionEvent::ActorPanicked(dead_actor, _panic_msg) = message {
if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message {
self.flag
.store(dead_actor.get_id().pid(), Ordering::Relaxed);
this_actor.stop(None);
Expand Down Expand Up @@ -164,7 +164,7 @@ async fn test_supervision_error_in_post_startup() {
println!("Supervisor event received {message:?}");

// check that the panic was captured
if let SupervisionEvent::ActorPanicked(dead_actor, _panic_msg) = message {
if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message {
self.flag
.store(dead_actor.get_id().pid(), Ordering::Relaxed);
this_actor.stop(None);
Expand Down Expand Up @@ -253,7 +253,7 @@ async fn test_supervision_panic_in_handle() {
println!("Supervisor event received {message:?}");

// check that the panic was captured
if let SupervisionEvent::ActorPanicked(dead_actor, _panic_msg) = message {
if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message {
self.flag
.store(dead_actor.get_id().pid(), Ordering::Relaxed);
this_actor.stop(None);
Expand Down Expand Up @@ -350,7 +350,7 @@ async fn test_supervision_error_in_handle() {
println!("Supervisor event received {message:?}");

// check that the panic was captured
if let SupervisionEvent::ActorPanicked(dead_actor, _panic_msg) = message {
if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message {
self.flag
.store(dead_actor.get_id().pid(), Ordering::Relaxed);
this_actor.stop(None);
Expand Down Expand Up @@ -439,7 +439,7 @@ async fn test_supervision_panic_in_post_stop() {
println!("Supervisor event received {message:?}");

// check that the panic was captured
if let SupervisionEvent::ActorPanicked(dead_actor, _panic_msg) = message {
if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message {
self.flag
.store(dead_actor.get_id().pid(), Ordering::Relaxed);
this_actor.stop(None);
Expand Down Expand Up @@ -520,7 +520,7 @@ async fn test_supervision_error_in_post_stop() {
println!("Supervisor event received {message:?}");

// check that the panic was captured
if let SupervisionEvent::ActorPanicked(dead_actor, _panic_msg) = message {
if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message {
self.flag
.store(dead_actor.get_id().pid(), Ordering::Relaxed);
this_actor.stop(None);
Expand Down Expand Up @@ -600,7 +600,7 @@ async fn test_supervision_panic_in_supervisor_handle() {
_message: SupervisionEvent,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if let SupervisionEvent::ActorPanicked(_child, _msg) = _message {
if let SupervisionEvent::ActorFailed(_child, _msg) = _message {
panic!("Boom again!");
}
Ok(())
Expand Down Expand Up @@ -637,7 +637,7 @@ async fn test_supervision_panic_in_supervisor_handle() {
println!("Supervisor event received {message:?}");

// check that the panic was captured
if let SupervisionEvent::ActorPanicked(dead_actor, _panic_msg) = message {
if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message {
self.flag
.store(dead_actor.get_id().pid(), Ordering::Relaxed);
this_actor.stop(None);
Expand Down Expand Up @@ -742,7 +742,7 @@ async fn test_supervision_error_in_supervisor_handle() {
_message: SupervisionEvent,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if let SupervisionEvent::ActorPanicked(_child, _msg) = _message {
if let SupervisionEvent::ActorFailed(_child, _msg) = _message {
return Err(From::from("boom again!"));
}
Ok(())
Expand Down Expand Up @@ -779,7 +779,7 @@ async fn test_supervision_error_in_supervisor_handle() {
println!("Supervisor event received {message:?}");

// check that the panic was captured
if let SupervisionEvent::ActorPanicked(dead_actor, _panic_msg) = message {
if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message {
self.flag
.store(dead_actor.get_id().pid(), Ordering::Relaxed);
this_actor.stop(None);
Expand Down
24 changes: 8 additions & 16 deletions ractor/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ use crate::ActorName;
pub type ActorProcessingErr = Box<dyn std::error::Error + Send + Sync + 'static>;

/// Spawn errors starting an actor
#[derive(Debug)] // TODO: why Eq, PartialEq?
#[derive(Debug)]
pub enum SpawnErr {
/// Actor panic'd during startup
StartupPanic(ActorProcessingErr),
/// Actor failed to startup because the startup task was cancelled
StartupCancelled,
/// Actor panic'd or returned an error during startup
StartupFailed(ActorProcessingErr),
/// An actor cannot be started > 1 time
ActorAlreadyStarted,
/// The named actor is already registered in the registry
Expand All @@ -28,7 +26,7 @@ pub enum SpawnErr {
impl std::error::Error for SpawnErr {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match &self {
Self::StartupPanic(inner) => Some(inner.as_ref()),
Self::StartupFailed(inner) => Some(inner.as_ref()),
_ => None,
}
}
Expand All @@ -37,19 +35,13 @@ impl std::error::Error for SpawnErr {
impl Display for SpawnErr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::StartupPanic(panic_msg) => {
Self::StartupFailed(panic_msg) => {
if f.alternate() {
write!(f, "Actor panicked during startup '{panic_msg:#}'")
} else {
write!(f, "Actor panicked during startup '{panic_msg}'")
}
}
Self::StartupCancelled => {
write!(
f,
"Actor failed to startup due to processing task being cancelled"
)
}
Self::ActorAlreadyStarted => {
write!(f, "Actor cannot be re-started more than once")
}
Expand Down Expand Up @@ -79,13 +71,13 @@ pub enum ActorErr {
/// Actor had a task cancelled internally during processing
Cancelled,
/// Actor had an internal panic
Panic(ActorProcessingErr),
Failed(ActorProcessingErr),
}

impl std::error::Error for ActorErr {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match &self {
Self::Panic(inner) => Some(inner.as_ref()),
Self::Failed(inner) => Some(inner.as_ref()),
_ => None,
}
}
Expand All @@ -94,7 +86,7 @@ impl std::error::Error for ActorErr {
impl Display for ActorErr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Panic(panic_msg) => {
Self::Failed(panic_msg) => {
if f.alternate() {
write!(f, "Actor panicked '{panic_msg:#}'")
} else {
Expand Down
2 changes: 1 addition & 1 deletion ractor/src/factory/factoryimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ where
state.try_route_next_active_job(wid)?;
}
}
SupervisionEvent::ActorPanicked(who, reason) => {
SupervisionEvent::ActorFailed(who, reason) => {
let wid = if let Some(worker) = state
.pool
.values_mut()
Expand Down
2 changes: 1 addition & 1 deletion ractor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//!
//! ```toml
//! [dependencies]
//! ractor = "0.9"
//! ractor = "0.10"
//! ```
//!
//! The minimum supported Rust version (MSRV) is 1.64. However if you disable the `async-trait` feature, then you need Rust >= 1.75 due to the native
Expand Down
Loading
Loading