Skip to content

Commit

Permalink
Expose async-trait to as a feature which can be enabled or disabled s…
Browse files Browse the repository at this point in the history
…electively
  • Loading branch information
slawlor committed Jan 6, 2025
1 parent f14e712 commit bccdedd
Show file tree
Hide file tree
Showing 19 changed files with 45 additions and 137 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ jobs:
- name: Test ractor with the `blanket_serde` feature
package: ractor
flags: -F blanket_serde
- name: Test ractor_cluster
- name: Test ractor_cluster with native async traits
package: ractor_cluster
# flags:
- name: Test ractor_cluster with async_trait
package: ractor_cluster
flags: -F async-trait

steps:
- uses: actions/checkout@main
Expand Down
11 changes: 5 additions & 6 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ on:

jobs:
test:
name: Test ractor_cluster with Docker based networked images
runs-on: ${{matrix.os}}-latest
name: Test networked cluster
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
toolchain: [stable]
os: [ubuntu]
features: [blanket_serde, cluster]

features:
- "blanket_serde, cluster"
- "blanket_serde, cluster, async-trait"
steps:
- uses: actions/checkout@main

Expand Down
3 changes: 2 additions & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ cluster = []
message_span_propogation = []
tokio_runtime = ["tokio/time", "tokio/rt", "tokio/macros", "tokio/tracing"]
blanket_serde = ["serde", "pot", "cluster"]
async-trait = ["dep:async-trait"]

default = ["tokio_runtime", "async-trait", "message_span_propogation"]
default = ["tokio_runtime", "message_span_propogation"]

[dependencies]
## Required dependencies
Expand Down
8 changes: 4 additions & 4 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub trait Actor: Sized + Sync + Send + 'static {
///
/// * `myself` - A handle to the [ActorCell] representing this actor
/// * `args` - Arguments that are passed in the spawning of the actor which might
/// be necessary to construct the initial state
/// be necessary to construct the initial state
///
/// Returns an initial [Actor::State] to bootstrap the actor
#[cfg(not(feature = "async-trait"))]
Expand Down Expand Up @@ -357,7 +357,7 @@ pub trait Actor: Sized + Sync + Send + 'static {
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The implementation of Self
/// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
/// initial state creation
/// initial state creation
///
/// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
/// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
Expand All @@ -375,7 +375,7 @@ pub trait Actor: Sized + Sync + Send + 'static {
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The implementation of Self
/// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
/// initial state creation
/// initial state creation
///
/// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
/// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
Expand All @@ -394,7 +394,7 @@ pub trait Actor: Sized + Sync + Send + 'static {
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The implementation of Self
/// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
/// initial state creation
/// initial state creation
/// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
///
/// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
Expand Down
2 changes: 1 addition & 1 deletion ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ async fn actor_post_stop_executed_before_stop_and_wait_returns() {
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
sleep(Duration::from_millis(1000)).await;
let _ = self.signal.store(1, Ordering::SeqCst);
self.signal.store(1, Ordering::SeqCst);
Ok(())
}
}
Expand Down
109 changes: 1 addition & 108 deletions ractor/src/factory/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub trait Worker: Send + Sync + 'static {
/// * `wid` - The id of this worker in the factory
/// * `factory` - The handle to the factory that owns and manages this worker
/// * `args` - Arguments that are passed in the spawning of the worker which are
/// necessary to construct the initial state
/// necessary to construct the initial state
///
/// Returns an initial [Worker::State] to bootstrap the actor
#[cfg(not(feature = "async-trait"))]
Expand Down Expand Up @@ -310,7 +310,6 @@ where
>;
type State = WorkerState<Self>;

#[cfg(feature = "async-trait")]
async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
Expand All @@ -328,39 +327,6 @@ where
})
}

#[cfg(not(feature = "async-trait"))]
fn pre_start(
&self,
_: ActorRef<Self::Msg>,
WorkerStartContext {
wid,
factory,
custom_start,
}: Self::Arguments,
) -> impl Future<Output = Result<Self::State, ActorProcessingErr>> + Send {
async move {
let inner_state =
<Self as Worker>::pre_start(&self, wid, &factory, custom_start).await?;
Ok(Self::State {
wid,
factory,
state: inner_state,
})
}
}

#[cfg(not(feature = "async-trait"))]
fn post_start(
&self,
_: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
async move {
<Self as Worker>::post_start(&self, state.wid, &state.factory, &mut state.state).await
}
}

#[cfg(feature = "async-trait")]
async fn post_start(
&self,
_: ActorRef<Self::Msg>,
Expand All @@ -369,18 +335,6 @@ where
<Self as Worker>::post_start(self, state.wid, &state.factory, &mut state.state).await
}

#[cfg(not(feature = "async-trait"))]
fn post_stop(
&self,
_: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
async move {
<Self as Worker>::post_stop(&self, state.wid, &state.factory, &mut state.state).await
}
}

#[cfg(feature = "async-trait")]
async fn post_stop(
&self,
_: ActorRef<Self::Msg>,
Expand All @@ -389,53 +343,6 @@ where
<Self as Worker>::post_stop(self, state.wid, &state.factory, &mut state.state).await
}

#[cfg(not(feature = "async-trait"))]
fn handle(
&self,
_: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
async move {
match message {
WorkerMessage::FactoryPing(time) => {
tracing::trace!("Worker {} - ping", state.wid);

state
.factory
.cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
}
WorkerMessage::Dispatch(mut job) => {
let key = if let Some(span) = job.options.take_span() {
<Self as Worker>::handle(
&self,
state.wid,
&state.factory,
job,
&mut state.state,
)
.instrument(span)
.await
} else {
<Self as Worker>::handle(
&self,
state.wid,
&state.factory,
job,
&mut state.state,
)
.await
}?;
state
.factory
.cast(FactoryMessage::Finished(state.wid, key))?;
}
}
Ok(())
}
}

#[cfg(feature = "async-trait")]
async fn handle(
&self,
_: ActorRef<Self::Msg>,
Expand Down Expand Up @@ -468,20 +375,6 @@ where
}
}

#[cfg(not(feature = "async-trait"))]
fn handle_supervisor_evt(
&self,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
async move {
<Self as Worker>::handle_supervisor_evt(&self, myself.into(), message, &mut state.state)
.await
}
}

#[cfg(feature = "async-trait")]
async fn handle_supervisor_evt(
&self,
myself: ActorRef<Self::Msg>,
Expand Down
9 changes: 8 additions & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ categories = ["asynchronous"]
build = "src/build.rs"
rust-version = "1.64"

[features]
async-trait = ["dep:async-trait", "ractor/async-trait"]

default = []

[build-dependencies]
# protobuf-src = "2"
protoc-bin-vendored = "3"
Expand All @@ -24,13 +29,15 @@ prost-build = { version = "0.13" }
bytes = { version = "1" }
prost = { version = "0.13" }
prost-types = { version = "0.13" }
ractor = { version = "0.14.0", features = ["cluster"], path = "../ractor" }
ractor = { version = "0.14.0", default-features = false, features = ["tokio_runtime", "message_span_propogation", "cluster"], path = "../ractor" }
ractor_cluster_derive = { version = "0.14.0", path = "../ractor_cluster_derive" }
rand = "0.8"
sha2 = "0.10"
tokio = { version = "1.30", features = ["rt", "time", "sync", "macros", "net", "io-util", "tracing"]}
tokio-rustls = { version = "0.26" }
tracing = "0.1"
## Optional dependencies
async-trait = { version = "0.1", optional = true }

[dev-dependencies]
tokio = { version = "1.30", features = ["rt", "time", "sync", "macros", "net", "io-util", "rt-multi-thread"] }
4 changes: 4 additions & 0 deletions ractor_cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pub mod node;
/// Node's are representing by an integer id
pub type NodeId = u64;

// Satisfy dependencies transitively imported
#[cfg(feature = "async-trait")]
use async_trait as _;

// ============== Re-exports ============== //
pub use net::{IncomingEncryptionMode, NetworkStream};
pub use node::client::connect as client_connect;
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster/src/net/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct ListenerState {
#[derive(crate::RactorMessage)]
pub struct ListenerMessage;

#[ractor::async_trait]
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for Listener {
type Msg = ListenerMessage;
type Arguments = ();
Expand Down
6 changes: 3 additions & 3 deletions ractor_cluster/src/net/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub struct SessionState {
reader: ActorRef<SessionReaderMessage>,
}

#[ractor::async_trait]
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for Session {
type Msg = SessionMessage;
type Arguments = super::NetworkStream;
Expand Down Expand Up @@ -296,7 +296,7 @@ enum SessionWriterMessage {
WriteObject(crate::protocol::NetworkMessage),
}

#[ractor::async_trait]
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for SessionWriter {
type Msg = SessionWriterMessage;
type Arguments = ActorWriteHalf;
Expand Down Expand Up @@ -385,7 +385,7 @@ struct SessionReaderState {
reader: Option<ActorReadHalf>,
}

#[ractor::async_trait]
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for SessionReader {
type Msg = SessionReaderMessage;
type Arguments = ActorReadHalf;
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl NodeServerState {
}
}

#[ractor::async_trait]
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for NodeServer {
type Msg = NodeServerMessage;
type State = NodeServerState;
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster/src/node/node_session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ impl NodeSessionState {
}
}

#[ractor::async_trait]
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for NodeSession {
type Msg = super::NodeSessionMessage;
type Arguments = crate::net::NetworkStream;
Expand Down
6 changes: 3 additions & 3 deletions ractor_cluster/src/node/node_session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::NodeSessionMessage;
use super::*;

struct DummyNodeServer;
#[ractor::async_trait]
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for DummyNodeServer {
type Msg = crate::node::NodeServerMessage;
type State = ();
Expand Down Expand Up @@ -57,7 +57,7 @@ impl Actor for DummyNodeServer {
}

struct DummyNodeSession;
#[ractor::async_trait]
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for DummyNodeSession {
type Msg = crate::node::NodeSessionMessage;
type State = ();
Expand Down Expand Up @@ -540,7 +540,7 @@ async fn node_session_handle_node_msg() {
call_replies: Arc<AtomicU8>,
}

#[ractor::async_trait]
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for DummyRemoteActor {
type Msg = crate::remote_actor::RemoteActorMessage;
type State = ();
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster/src/remote_actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl RemoteActorState {
#[derive(RactorMessage)]
pub(crate) struct RemoteActorMessage;

#[ractor::async_trait]
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for RemoteActor {
type Msg = RemoteActorMessage;
type State = RemoteActorState;
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster/src/remote_actor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl FakeNodeSession {
}
}

#[ractor::async_trait]
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for FakeNodeSession {
type Msg = crate::node::NodeSessionMessage;
type State = ();
Expand Down
5 changes: 4 additions & 1 deletion ractor_cluster_integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ publish = false
[features]
blanket_serde = ["ractor/blanket_serde"]
cluster = []
async-trait = ["ractor_cluster/async-trait"]

default = []

[dependencies]
anyhow = "1"
async-trait = "0.1"
clap = { version = "4", features = ["derive"] }
ractor = { path = "../ractor" }
ractor = { default-features = false, features = ["tokio_runtime", "message_span_propogation", "cluster"], path = "../ractor" }
ractor_cluster = { path = "../ractor_cluster" }
rand = "0.8"
tokio-rustls = { version = "0.26" }
Expand Down
Loading

0 comments on commit bccdedd

Please sign in to comment.