Skip to content

Commit

Permalink
Some of this is unavoidable due to each actor having 3 bounded and 1 …
Browse files Browse the repository at this point in the history
…unbounded messaging channels, but the supervision logic alone had 3 dashmaps inside it. The monitoring functionality is pretty limited at-best, and we have no active use-cases for it, so dropping it and moving to a simple locked HashMap has allowed us to drop the heap space per actor to a little over 9KB

Additionally in running benchmarks against `main` we see large perf wins on actor creation time

```
     Running benches/actor.rs (target/release/deps/actor-464b0ad86c94b02f)
Gnuplot not found, using plotters backend
Creation of 100 actors  time:   [269.21 µs 282.84 µs 298.29 µs]
                        change: [-66.867% -64.514% -61.953%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 15 outliers among 100 measurements (15.00%)
  9 (9.00%) high mild
  6 (6.00%) high severe

Creation of 10000 actors
                        time:   [25.273 ms 25.862 ms 26.484 ms]
                        change: [-55.663% -53.735% -51.795%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
```
  • Loading branch information
slawlor committed Sep 21, 2024
1 parent 4d45d5b commit b7f1126
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 233 deletions.
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.10.4"
version = "0.11.0"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
86 changes: 86 additions & 0 deletions ractor/examples/a_whole_lotta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! Just creates a LOT of actors. Useful for measuring max memory util
//!
//! Execute with
//!
//! ```text
//! cargo run --example a_whole_lotta
//! ```
#![allow(clippy::incompatible_msrv)]

extern crate ractor;

use ractor::{Actor, ActorProcessingErr, ActorRef};

struct Counter;

#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for Counter {
type Msg = ();
type State = ();
type Arguments = ();

async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
tracing::info!("Starting the actor");
// create the initial state
Ok(())
}
}

fn init_logging() {
let dir = tracing_subscriber::filter::Directive::from(tracing::Level::DEBUG);

use std::io::stderr;
use std::io::IsTerminal;
use tracing_glog::Glog;
use tracing_glog::GlogFields;
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry;

let fmt = tracing_subscriber::fmt::Layer::default()
.with_ansi(stderr().is_terminal())
.with_writer(std::io::stderr)
.event_format(Glog::default().with_timer(tracing_glog::LocalTime::default()))
.fmt_fields(GlogFields::default().compact());

let filter = vec![dir]
.into_iter()
.fold(EnvFilter::from_default_env(), |filter, directive| {
filter.add_directive(directive)
});

let subscriber = Registry::default().with(filter).with(fmt);
tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber");
}

#[tokio::main]
async fn main() {
init_logging();

let mut actors = Vec::new();

for _ in 0..100000 {
actors.push(
Actor::spawn(None, Counter, ())
.await
.expect("Failed to start actor!"),
);
}

for act in actors.iter() {
act.0.stop(None);
}
for (_, h) in actors.into_iter() {
h.await.expect("Failed to wait for actor shutdown");
}
}
1 change: 0 additions & 1 deletion ractor/examples/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ fn init_logging() {
#[tokio::main]
async fn main() {
init_logging();

let (actor, handle) = Actor::spawn(Some("test_name".to_string()), Counter, ())
.await
.expect("Failed to start actor!");
Expand Down
30 changes: 2 additions & 28 deletions ractor/src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,32 +373,6 @@ impl ActorCell {
self.inner.tree.clear_supervisor();
}

/// Monitor the provided [super::Actor] for supervision events. An actor in `ractor` can
/// only have a single supervisor, denoted by the `link` function, however they
/// may have multiple `monitors`. Monitor's receive copies of the [SupervisionEvent]s,
/// with non-cloneable information removed.
///
/// * `who`: The actor to monitor
pub fn monitor(&self, who: ActorCell) {
who.inner.tree.set_monitor(self.clone());
self.inner.tree.mark_monitored(who);
}

/// Stop monitoring the provided [super::Actor] for supervision events.
///
/// * `who`: The actor to stop monitoring
pub fn unmonitor(&self, who: ActorCell) {
self.inner.tree.unmark_monitored(who.get_id());
who.inner.tree.remove_monitor(self.get_id());
}

/// Clear all the [self::Actor]s which are monitored by this [self::Actor]
pub fn clear_monitors(&self) {
for id in self.inner.tree.monitored_actors() {
self.unmonitor(id);
}
}

/// Kill this [super::Actor] forcefully (terminates async work)
pub fn kill(&self) {
let _ = self.inner.send_signal(Signal::Kill);
Expand Down Expand Up @@ -509,8 +483,8 @@ impl ActorCell {
/// exception to a [String]
///
/// * `evt` - The event to send to this [super::Actor]'s supervisors
pub fn notify_supervisor_and_monitors(&self, evt: SupervisionEvent) {
self.inner.tree.notify_supervisor_and_monitors(evt)
pub fn notify_supervisor(&self, evt: SupervisionEvent) {
self.inner.tree.notify_supervisor(evt)
}

pub(crate) fn get_type_id(&self) -> TypeId {
Expand Down
2 changes: 1 addition & 1 deletion ractor/src/actor/actor_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use std::{fmt::Display, sync::atomic::AtomicU64};

/// An actor's globally unique identifier
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub enum ActorId {
/// A local pid
Local(u64),
Expand Down
2 changes: 1 addition & 1 deletion ractor/src/actor/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<TMessage> ActorRef<TMessage> {
///
/// * `evt` - The event to send to this [crate::Actor]'s supervisors
pub fn notify_supervisor_and_monitors(&self, evt: SupervisionEvent) {
self.inner.notify_supervisor_and_monitors(evt)
self.inner.notify_supervisor(evt)
}
}

Expand Down
20 changes: 0 additions & 20 deletions ractor/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,6 @@ pub enum SupervisionEvent {
}

impl SupervisionEvent {
/// Clone the supervision event, without requiring inner data
/// be cloneable. This means that the actor error (if present) is converted
/// to a string and copied as well as the state upon termination being not
/// propogated. If the state were cloneable, we could propogate it, however
/// that restriction is overly restrictive, so we've avoided it.
pub(crate) fn clone_no_data(&self) -> Self {
match self {
Self::ActorStarted(who) => Self::ActorStarted(who.clone()),
Self::ActorFailed(who, what) => {
Self::ActorFailed(who.clone(), From::from(format!("{what}")))
}
Self::ProcessGroupChanged(what) => Self::ProcessGroupChanged(what.clone()),
Self::ActorTerminated(who, _state, msg) => {
Self::ActorTerminated(who.clone(), None, msg.as_ref().cloned())
}
#[cfg(feature = "cluster")]
Self::PidLifecycleEvent(evt) => Self::PidLifecycleEvent(evt.clone()),
}
}

/// If this supervision event refers to an [Actor] lifecycle event, return
/// the [ActorCell] for that [actor][Actor].
///
Expand Down
3 changes: 0 additions & 3 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,9 +759,6 @@ where
// notify supervisors of the actor's death
myself.notify_supervisor_and_monitors(evt);

// clear any monitor actors
myself.clear_monitors();

// unlink superisors
if let Some(sup) = supervisor {
myself.unlink(sup);
Expand Down
88 changes: 20 additions & 68 deletions ractor/src/actor/supervision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,93 +12,51 @@
//! notifying the supervisor's supervisor? That's up to the implementation of the [super::Actor]
//!
//! This is currently an initial implementation of [Erlang supervisors](https://www.erlang.org/doc/man/supervisor.html)
//! which will be expanded upon as the library develops. Next in line is likely supervision strategies
//! for automatic restart routines.
//! which will be expanded upon as the library develops.
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
};

use dashmap::DashMap;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use super::{actor_cell::ActorCell, messages::SupervisionEvent};
use crate::ActorId;

/// A supervision tree
#[derive(Default)]
pub struct SupervisionTree {
children: DashMap<ActorId, (u64, ActorCell)>,
supervisor: Arc<RwLock<Option<ActorCell>>>,
monitors: DashMap<ActorId, ActorCell>,
monitored: DashMap<ActorId, ActorCell>,
start_order: AtomicU64,
children: Arc<Mutex<HashMap<ActorId, ActorCell>>>,
supervisor: Arc<Mutex<Option<ActorCell>>>,
}

impl SupervisionTree {
/// Push a child into the tere
pub fn insert_child(&self, child: ActorCell) {
let start_order = self.start_order.fetch_add(1, Ordering::Relaxed);
self.children.insert(child.get_id(), (start_order, child));
self.children.lock().unwrap().insert(child.get_id(), child);
}

/// Remove a specific actor from the supervision tree (e.g. actor died)
pub fn remove_child(&self, child: ActorId) {
match self.children.entry(child) {
dashmap::mapref::entry::Entry::Occupied(item) => {
item.remove();
}
dashmap::mapref::entry::Entry::Vacant(_) => {}
}
self.children.lock().unwrap().remove(&child);
}

/// Push a parent into the tere
pub fn set_supervisor(&self, parent: ActorCell) {
*(self.supervisor.write().unwrap()) = Some(parent);
*(self.supervisor.lock().unwrap()) = Some(parent);
}

/// Remove a specific actor from the supervision tree (e.g. actor died)
pub fn clear_supervisor(&self) {
*(self.supervisor.write().unwrap()) = None;
}

/// Set a monitor of this supervision tree
pub fn set_monitor(&self, monitor: ActorCell) {
self.monitors.insert(monitor.get_id(), monitor);
}

/// Mark that this actor is monitoring some other actors
pub fn mark_monitored(&self, who: ActorCell) {
self.monitored.insert(who.get_id(), who);
}

/// Mark that this actor is no longer monitoring some other actors
pub fn unmark_monitored(&self, who: ActorId) {
self.monitored.remove(&who);
}

/// Remove a specific monitor from the supervision tree
pub fn remove_monitor(&self, monitor: ActorId) {
self.monitors.remove(&monitor);
}

/// Get the [ActorCell]s of the monitored actors this actor monitors
pub fn monitored_actors(&self) -> Vec<ActorCell> {
self.monitored.iter().map(|a| a.value().clone()).collect()
*(self.supervisor.lock().unwrap()) = None;
}

/// Terminate all your supervised children and unlink them
/// from the supervision tree since the supervisor is shutting down
/// and can't deal with superivison events anyways
pub fn terminate_all_children(&self) {
let cells = self
.children
.iter()
.map(|r| r.1.clone())
.collect::<Vec<_>>();
// wipe local children to prevent double-link problems
self.children.clear();

let mut guard = self.children.lock().unwrap();
let cells = guard.iter().map(|(_, a)| a.clone()).collect::<Vec<_>>();
guard.clear();
// drop the guard to not deadlock on double-link
drop(guard);
for cell in cells {
cell.terminate();
cell.clear_supervisor();
Expand All @@ -107,35 +65,29 @@ impl SupervisionTree {

/// Determine if the specified actor is a parent of this actor
pub fn is_child_of(&self, id: ActorId) -> bool {
if let Some(parent) = &*(self.supervisor.read().unwrap()) {
if let Some(parent) = &*(self.supervisor.lock().unwrap()) {
parent.get_id() == id
} else {
false
}
}

/// Send a notification to the supervisor and monitors.
///
/// CAVEAT: Monitors get notified first, in order to save an unnecessary
/// clone if there are no monitors.
pub fn notify_supervisor_and_monitors(&self, evt: SupervisionEvent) {
for monitor in self.monitors.iter() {
let _ = monitor.value().send_supervisor_evt(evt.clone_no_data());
}
if let Some(parent) = &*(self.supervisor.read().unwrap()) {
/// Send a notification to the supervisor.
pub fn notify_supervisor(&self, evt: SupervisionEvent) {
if let Some(parent) = &*(self.supervisor.lock().unwrap()) {
let _ = parent.send_supervisor_evt(evt);
}
}

/// Retrieve the number of supervised children
#[cfg(test)]
pub fn get_num_children(&self) -> usize {
self.children.len()
self.children.lock().unwrap().len()
}

/// Retrieve the number of supervised children
#[cfg(test)]
pub fn get_num_parents(&self) -> usize {
usize::from(self.supervisor.read().unwrap().is_some())
usize::from(self.supervisor.lock().unwrap().is_some())
}
}
Loading

0 comments on commit b7f1126

Please sign in to comment.