Skip to content

Commit

Permalink
Merge pull request #28 from slawlor/macros
Browse files Browse the repository at this point in the history
Adding RPC macros for handy, dandy calling automation
  • Loading branch information
slawlor authored Jan 18, 2023
2 parents 67ac50c + 8fd3c8c commit 51e52d8
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 14 deletions.
8 changes: 2 additions & 6 deletions ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,19 +233,15 @@ async fn test_sending_message_to_invalid_actor_type() {
impl Actor for TestActor1 {
type Msg = TestMessage1;
type State = ();
async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {
()
}
async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {}
}
struct TestActor2;
struct TestMessage2;
#[async_trait::async_trait]
impl Actor for TestActor2 {
type Msg = TestMessage2;
type State = ();
async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {
()
}
async fn pre_start(&self, _myself: ActorRef<Self>) -> Self::State {}
}

let (actor1, handle1) = Actor::spawn(None, TestActor1)
Expand Down
64 changes: 64 additions & 0 deletions ractor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,16 @@ pub type GroupName = &'static str;

pub mod actor;
pub mod actor_id;
pub mod macros;
pub mod pg;
pub mod port;
pub mod registry;
pub mod rpc;
pub mod time;

#[cfg(test)]
mod tests;

#[cfg(test)]
use criterion as _;
#[cfg(test)]
Expand Down Expand Up @@ -189,3 +193,63 @@ impl<T: Any + Send + 'static> Message for T {}
/// to send between threads (same bounds as a [Message])
pub trait State: Message {}
impl<T: Message> State for T {}

/// Error types which can result from Ractor processes
#[derive(Debug)]
pub enum RactorErr {
/// An error occurred spawning
Spawn(SpawnErr),
/// An error occurred in messaging (sending/receiving)
Messaging(MessagingErr),
/// An actor encountered an error while processing (canceled or panicked)
Actor(ActorErr),
/// A timeout occurred
Timeout,
}

impl From<SpawnErr> for RactorErr {
fn from(value: SpawnErr) -> Self {
RactorErr::Spawn(value)
}
}

impl From<MessagingErr> for RactorErr {
fn from(value: MessagingErr) -> Self {
RactorErr::Messaging(value)
}
}

impl From<ActorErr> for RactorErr {
fn from(value: ActorErr) -> Self {
RactorErr::Actor(value)
}
}

impl<TResult> From<rpc::CallResult<TResult>> for RactorErr {
fn from(value: rpc::CallResult<TResult>) -> Self {
match value {
rpc::CallResult::SenderError => RactorErr::Messaging(MessagingErr::ChannelClosed),
rpc::CallResult::Timeout => RactorErr::Timeout,
_ => panic!("A successful `CallResult` cannot be mapped to a `RactorErr`"),
}
}
}

impl std::fmt::Display for RactorErr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Actor(actor_err) => {
write!(f, "{}", actor_err)
}
Self::Messaging(messaging_err) => {
write!(f, "{}", messaging_err)
}
Self::Spawn(spawn_err) => {
write!(f, "{}", spawn_err)
}
Self::Timeout => {
write!(f, "timeout")
}
}
}
}
129 changes: 129 additions & 0 deletions ractor/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! Macro helpers for remote procedure calls
/// `cast!` takes an actor and a message and emits a [crate::RactorErr] error
/// which can be pattern matched on in order to derive the output
#[macro_export]
macro_rules! cast {
($actor:ident, $msg:expr) => {
$actor.cast($msg).map_err($crate::RactorErr::from)
};
}

/// `call!`: Perform an inifinite-time remote procedure call to an [crate::Actor]
///
/// * `$actor` - The actor to call
/// * `$msg` - The message builder which takes in a [crate::port::RpcReplyPort] and emits a message which
/// the actor supports
///
/// Returns [Ok(_)] with the result on successful RPC or [Err(crate::RactorErr)] on failure
#[macro_export]
macro_rules! call {
($actor:ident, $msg:expr) => {{
let err = $actor
.call(|tx| $msg(tx), None)
.await
.map_err($crate::RactorErr::from);
match err {
Ok($crate::rpc::CallResult::Success(ok_value)) => Ok(ok_value),
Ok(cr) => Err($crate::RactorErr::from(cr)),
Err(e) => Err(e),
}
}};
}

/// `call_t!`: Perform an finite-time remote procedure call to an [crate::Actor]
///
/// * `$actor` - The actor to call
/// * `$msg` - The message builder which takes in a [crate::port::RpcReplyPort] and emits a message which
/// the actor supports
/// * `$timeout` - The [tokio::time::Duration] timeout for how long the call can take before timing out
///
/// Returns [Ok(_)] with the result on successful RPC or [Err(crate::RactorErr)] on failure
#[macro_export]
macro_rules! call_t {
($actor:ident, $msg:expr, $timeout:expr) => {{
let err = $actor
.call(|tx| $msg(tx), Some($timeout))
.await
.map_err($crate::RactorErr::from);
match err {
Ok($crate::rpc::CallResult::Success(ok_value)) => Ok(ok_value),
Ok(cr) => Err($crate::RactorErr::from(cr)),
Err(e) => Err(e),
}
}};
}

/// `forward!`: Perform an infinite-time remote procedure call to a [crate::Actor]
/// and forwards the result to another actor if successful
///
/// * `$actor` - The actors to call
/// * `$msg` - The message builder, which takes in a [crate::port::RpcReplyPort] and emits a message which
/// the actor supports.
/// * `$forward` - The [crate::ActorRef] to forward the call to
/// * `$forward_mapping` - The message transformer from the RPC result to the forwarding actor's message format
///
/// Returns [Ok(())] on successful call forwarding, [Err(crate::RactorErr)] otherwies
#[macro_export]
macro_rules! forward {
($actor:ident, $msg:expr, $forward:ident, $forward_mapping:expr) => {{
let future_or_err = $actor
.call_and_forward(|tx| $msg(tx), &$forward, $forward_mapping, None)
.map_err($crate::RactorErr::from);
match future_or_err {
Ok(future) => {
let err = future.await;
match err {
Ok($crate::rpc::CallResult::Success(Ok(()))) => Ok(()),
Ok($crate::rpc::CallResult::Success(Err(e))) => Err($crate::RactorErr::from(e)),
Ok(cr) => Err($crate::RactorErr::from(cr)),
Err(_join_err) => Err($crate::RactorErr::Messaging(
$crate::MessagingErr::ChannelClosed,
)),
}
}
Err(e) => Err(e),
}
}};
}

/// `forward_t!`: Perform an finite-time remote procedure call to a [crate::Actor]
/// and forwards the result to another actor if successful
///
/// * `$actor` - The actors to call
/// * `$msg` - The message builder, which takes in a [crate::port::RpcReplyPort] and emits a message which
/// the actor supports.
/// * `$forward` - The [crate::ActorRef] to forward the call to
/// * `$forward_mapping` - The message transformer from the RPC result to the forwarding actor's message format
/// * `$timeout` - The [tokio::time::Duration] to allow the call to complete before timing out.
///
/// Returns [Ok(())] on successful call forwarding, [Err(crate::RactorErr)] otherwies
#[macro_export]
macro_rules! forward_t {
($actor:ident, $msg:expr, $forward:ident, $forward_mapping:expr, $timeout:expr) => {{
let future_or_err = $actor
.call_and_forward(|tx| $msg(tx), &$forward, $forward_mapping, Some($timeout))
.map_err($crate::RactorErr::from);
match future_or_err {
Ok(future) => {
let err = future.await;
match err {
Ok($crate::rpc::CallResult::Success(Ok(()))) => Ok(()),
Ok($crate::rpc::CallResult::Success(Err(e))) => Err($crate::RactorErr::from(e)),
Ok(cr) => Err($crate::RactorErr::from(cr)),
Err(_join_err) => Err($crate::RactorErr::Messaging(
$crate::MessagingErr::ChannelClosed,
)),
}
}
Err(e) => Err(e),
}
}};
}

// TODO: subscribe to output port?
56 changes: 48 additions & 8 deletions ractor/src/rpc/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::Arc;
use tokio::time::Duration;

use crate::rpc;
use crate::{Actor, ActorRef};
use crate::{call, call_t, cast, forward, forward_t, Actor, ActorRef};

#[tokio::test]
async fn test_rpc_cast() {
Expand Down Expand Up @@ -50,12 +50,13 @@ async fn test_rpc_cast() {

actor_ref.cast(()).expect("Failed to send message");
actor_ref.cast(()).expect("Failed to send message");
cast!(actor_ref, ()).unwrap();

// make sure they have time to process
tokio::time::sleep(Duration::from_millis(100)).await;

// assert the actor received 2 cast requests
assert_eq!(2, counter.load(Ordering::Relaxed));
assert_eq!(3, counter.load(Ordering::Relaxed));

// cleanup
actor_ref.stop(None);
Expand All @@ -68,6 +69,7 @@ async fn test_rpc_call() {

enum MessageFormat {
TestRpc(rpc::RpcReplyPort<String>),
TestTimeout(rpc::RpcReplyPort<String>),
}

#[async_trait::async_trait]
Expand All @@ -92,6 +94,10 @@ async fn test_rpc_call() {
let _ = reply.send("howdy".to_string());
}
}
Self::Msg::TestTimeout(reply) => {
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = reply.send("howdy".to_string());
}
}
}
}
Expand All @@ -100,11 +106,15 @@ async fn test_rpc_call() {
.await
.expect("Failed to start test actor");

let rpc_result = actor_ref
.call(MessageFormat::TestRpc, Some(Duration::from_millis(100)))
.await
.expect("Failed to send message to actor")
.expect("RPC didn't succeed");
let rpc_result = call_t!(
actor_ref,
MessageFormat::TestRpc,
Duration::from_millis(100)
)
.unwrap();
assert_eq!("howdy".to_string(), rpc_result);

let rpc_result = call!(actor_ref, MessageFormat::TestRpc).unwrap();
assert_eq!("howdy".to_string(), rpc_result);

let rpc_result = actor_ref
Expand All @@ -114,8 +124,21 @@ async fn test_rpc_call() {
.expect("RPC didn't succeed");
assert_eq!("howdy".to_string(), rpc_result);

let rpc_timeout = call_t!(
actor_ref,
MessageFormat::TestTimeout,
Duration::from_millis(10)
);
assert!(rpc_timeout.is_err());
println!("RPC Error {:?}", rpc_timeout);

// cleanup
actor_ref.stop(None);

tokio::time::sleep(Duration::from_millis(200)).await;

let rpc_send_fail = call!(actor_ref, MessageFormat::TestRpc);
assert!(rpc_send_fail.is_err());
handle.await.expect("Actor stopped with err");
}

Expand Down Expand Up @@ -212,6 +235,23 @@ async fn test_rpc_call_forwarding() {
.expect("Call result didn't return success")
.expect("Failed to forward message");

forward!(
worker_ref,
WorkerMessage::TestRpc,
forwarder_ref,
ForwarderMessage::ForwardResult
)
.expect("Failed to foward message");

forward_t!(
worker_ref,
WorkerMessage::TestRpc,
forwarder_ref,
ForwarderMessage::ForwardResult,
Duration::from_millis(100)
)
.expect("Failed to forward message");

let forward_handle = worker_ref.call_and_forward(
WorkerMessage::TestRpc,
&forwarder_ref,
Expand All @@ -227,7 +267,7 @@ async fn test_rpc_call_forwarding() {
.expect("Failed to forward message");

// make sure the counter was bumped to say the message was forwarded
assert_eq!(2, counter.load(Ordering::Relaxed));
assert_eq!(4, counter.load(Ordering::Relaxed));

// cleanup
forwarder_ref.stop(None);
Expand Down
34 changes: 34 additions & 0 deletions ractor/src/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! Basic tests of errors, error conversions, etc
use crate::RactorErr;

#[test]
fn test_error_conversions() {
let spawn = crate::SpawnErr::StartupCancelled;
let ractor_err = RactorErr::from(crate::SpawnErr::StartupCancelled);
assert_eq!(spawn.to_string(), ractor_err.to_string());

let messaging = crate::MessagingErr::InvalidActorType;
let ractor_err = RactorErr::from(crate::MessagingErr::InvalidActorType);
assert_eq!(messaging.to_string(), ractor_err.to_string());

let actor = crate::ActorErr::Cancelled;
let ractor_err = RactorErr::from(crate::ActorErr::Cancelled);
assert_eq!(actor.to_string(), ractor_err.to_string());

let call_result = crate::rpc::CallResult::<()>::Timeout;
let other = format!("{:?}", RactorErr::from(call_result));
assert_eq!("Timeout".to_string(), other);

let call_result = crate::rpc::CallResult::<()>::SenderError;
let other = format!("{}", RactorErr::from(call_result));
assert_eq!(
RactorErr::from(crate::MessagingErr::ChannelClosed).to_string(),
other
);
}

0 comments on commit 51e52d8

Please sign in to comment.