Skip to content

Commit 96d06c8

Browse files
committed
feat(actors): add broker actor
1 parent 7587048 commit 96d06c8

File tree

6 files changed

+385
-8
lines changed

6 files changed

+385
-8
lines changed

actors/Cargo.toml

+6-1
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,13 @@ readme = "../README.md"
77
repository = "https://github.com/tqwewe/kameo"
88
license = "MIT OR Apache-2.0"
99
categories = ["asynchronous", "concurrency", "rust-patterns"]
10-
keywords = ["actor", "tokio"]
10+
keywords = ["actor", "tokio", "broker", "pool", "pubsub"]
1111

1212
[dependencies]
1313
futures.workspace = true
14+
glob = "0.3.2"
1415
kameo.workspace = true
16+
tokio.workspace = true
17+
18+
[dev-dependencies]
19+
tokio-test = "0.4.4"

actors/src/broker.rs

+309
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
//! Provides a topic-based message broker for the actor system.
2+
//!
3+
//! The `broker` module implements a flexible topic-based publish/subscribe mechanism that allows
4+
//! actors to communicate based on hierarchical topics rather than direct references. It supports
5+
//! glob pattern matching for topic subscriptions, allowing for powerful and flexible message routing.
6+
//!
7+
//! # Features
8+
//!
9+
//! - **Topic-Based Routing**: Messages are routed based on their topic rather than direct actor references.
10+
//! - **Pattern Matching**: Subscriptions use glob patterns, supporting wildcards and hierarchical topics.
11+
//! - **Multiple Delivery Strategies**: Configure how messages are delivered to handle different reliability needs.
12+
//! - **Automatic Cleanup**: Dead actor references are automatically removed from subscription lists.
13+
//!
14+
//! # When to use Broker vs PubSub
15+
//!
16+
//! The `broker` module provides a more sophisticated routing system compared to the simpler `pubsub` module:
17+
//!
18+
//! - Use **Broker** when you need hierarchical topics, pattern-based subscriptions, or different delivery strategies.
19+
//! - Use **PubSub** when you only need simple broadcast to all listeners with optional type-based filtering.
20+
//!
21+
//! # Example
22+
//!
23+
//! ```
24+
//! use kameo::Actor;
25+
//! use kameo_actors::broker::{Broker, Subscribe, Publish, DeliveryStrategy};
26+
//! use glob::Pattern;
27+
//! # use std::time::Duration;
28+
//! # use kameo::message::{Context, Message};
29+
//!
30+
//! #[derive(Actor, Clone)]
31+
//! struct TemperatureUpdate(f32);
32+
//!
33+
//! #[derive(Actor)]
34+
//! struct TemperatureSensor;
35+
//!
36+
//! #[derive(Actor)]
37+
//! struct DisplayActor;
38+
//!
39+
//! # impl Message<TemperatureUpdate> for DisplayActor {
40+
//! # type Reply = ();
41+
//! # async fn handle(&mut self, msg: TemperatureUpdate, ctx: &mut Context<Self, Self::Reply>) -> Self::Reply { }
42+
//! # }
43+
//!
44+
//! # tokio_test::block_on(async {
45+
//! // Create a broker with best effort delivery
46+
//! let broker = Broker::<TemperatureUpdate>::new(DeliveryStrategy::BestEffort);
47+
//! let broker_ref = kameo::spawn(broker);
48+
//!
49+
//! // Create a display actor and subscribe to kitchen temperature updates
50+
//! let display = kameo::spawn(DisplayActor);
51+
//! broker_ref.tell(Subscribe {
52+
//! topic: Pattern::new("sensors/kitchen/*").unwrap(),
53+
//! recipient: display.recipient(),
54+
//! }).await?;
55+
//!
56+
//! // Publish a temperature update
57+
//! broker_ref.tell(Publish {
58+
//! topic: "sensors/kitchen/temperature".to_string(),
59+
//! message: TemperatureUpdate(22.5),
60+
//! }).await?;
61+
//! # Ok::<(), Box<dyn std::error::Error>>(())
62+
//! # });
63+
//! ```
64+
65+
use std::{collections::HashMap, time::Duration};
66+
67+
use glob::{MatchOptions, Pattern};
68+
use kameo::prelude::*;
69+
70+
/// A generic topic-based message broker for the actor system.
71+
///
72+
/// The broker manages subscriptions to topics and delivers messages published
73+
/// to those topics according to the specified delivery strategy.
74+
///
75+
/// Topics use glob pattern matching syntax, allowing for flexible subscription patterns:
76+
/// - `sensors/*` - Any topic starting with "sensors/"
77+
/// - `*/temperature` - Any topic ending with "/temperature"
78+
/// - `sensors/*/humidity` - Match any topic with "sensors/" prefix and "/humidity" suffix
79+
#[derive(Actor, Clone, Debug, Default)]
80+
pub struct Broker<M: Send + 'static> {
81+
subscriptions: HashMap<Pattern, Vec<Recipient<M>>>,
82+
delivery_strategy: DeliveryStrategy,
83+
}
84+
85+
impl<M: Send + 'static> Broker<M> {
86+
/// Creates a new broker with the specified delivery strategy.
87+
///
88+
/// # Arguments
89+
///
90+
/// * `delivery_strategy` - Determines how messages are delivered to subscribers
91+
///
92+
/// # Returns
93+
///
94+
/// A new `Broker` instance with the specified delivery strategy
95+
pub fn new(delivery_strategy: DeliveryStrategy) -> Self {
96+
Broker {
97+
subscriptions: HashMap::new(),
98+
delivery_strategy,
99+
}
100+
}
101+
102+
fn unsubscribe(&mut self, pattern: &Pattern, actor_id: ActorID) {
103+
if let Some(recipients) = self.subscriptions.get_mut(pattern) {
104+
recipients.retain(|recipient| recipient.id() != actor_id);
105+
if recipients.is_empty() {
106+
self.subscriptions.remove(pattern);
107+
}
108+
}
109+
}
110+
}
111+
112+
/// Message for subscribing an actor to a topic pattern.
113+
///
114+
/// When an actor subscribes to a topic pattern, it will receive all messages
115+
/// published to topics that match that pattern.
116+
#[derive(Clone, Debug)]
117+
pub struct Subscribe<M: Send + 'static> {
118+
/// The pattern to subscribe to, using glob syntax
119+
pub topic: Pattern,
120+
/// The recipient that will receive messages published to matching topics
121+
pub recipient: Recipient<M>,
122+
}
123+
124+
impl<M: Send + 'static> Message<Subscribe<M>> for Broker<M> {
125+
type Reply = ();
126+
127+
async fn handle(
128+
&mut self,
129+
Subscribe { topic, recipient }: Subscribe<M>,
130+
_ctx: &mut Context<Self, Self::Reply>,
131+
) -> Self::Reply {
132+
self.subscriptions.entry(topic).or_default().push(recipient);
133+
}
134+
}
135+
136+
/// Message for unsubscribing an actor from topics.
137+
///
138+
/// Can unsubscribe from a specific topic pattern or all patterns.
139+
#[derive(Clone, Debug, PartialEq, Eq)]
140+
pub struct Unsubscribe {
141+
/// The specific topic pattern to unsubscribe from.
142+
/// If None, unsubscribe from all topic patterns.
143+
pub topic: Option<Pattern>,
144+
/// The ID of the actor to unsubscribe.
145+
pub actor_id: ActorID,
146+
}
147+
148+
impl<M: Send + 'static> Message<Unsubscribe> for Broker<M> {
149+
type Reply = ();
150+
151+
async fn handle(
152+
&mut self,
153+
Unsubscribe { topic, actor_id }: Unsubscribe,
154+
_ctx: &mut Context<Self, Self::Reply>,
155+
) -> Self::Reply {
156+
match topic {
157+
Some(topic) => {
158+
self.unsubscribe(&topic, actor_id);
159+
}
160+
None => {
161+
self.subscriptions.retain(|_, recipients| {
162+
recipients.retain(|recipient| recipient.id() != actor_id);
163+
!recipients.is_empty()
164+
});
165+
}
166+
}
167+
}
168+
}
169+
170+
/// Message for publishing content to a specific topic.
171+
///
172+
/// When a message is published to a topic, it will be delivered to all actors
173+
/// that have subscribed to matching topic patterns, according to the broker's
174+
/// delivery strategy.
175+
#[derive(Clone, Debug, PartialEq, Eq)]
176+
pub struct Publish<M: Send + 'static> {
177+
/// The exact topic to publish to (not a pattern)
178+
pub topic: String,
179+
/// The message payload to deliver to subscribers
180+
pub message: M,
181+
}
182+
183+
impl<M: Clone + Send + 'static> Message<Publish<M>> for Broker<M> {
184+
type Reply = ();
185+
186+
async fn handle(
187+
&mut self,
188+
Publish { topic, message }: Publish<M>,
189+
ctx: &mut Context<Self, Self::Reply>,
190+
) -> Self::Reply {
191+
let options = MatchOptions {
192+
case_sensitive: true,
193+
require_literal_separator: true,
194+
require_literal_leading_dot: false,
195+
};
196+
197+
let mut to_remove = Vec::new();
198+
for (pattern, recipients) in &self.subscriptions {
199+
if pattern.matches_with(&topic, options) {
200+
for recipient in recipients {
201+
match self.delivery_strategy {
202+
DeliveryStrategy::Guaranteed => {
203+
let res = recipient.tell(message.clone()).await;
204+
if let Err(SendError::ActorNotRunning(_)) = res {
205+
to_remove.push((pattern.clone(), recipient.id()));
206+
}
207+
}
208+
DeliveryStrategy::BestEffort => {
209+
let res = recipient.tell(message.clone()).try_send();
210+
if let Err(SendError::ActorNotRunning(_)) = res {
211+
to_remove.push((pattern.clone(), recipient.id()));
212+
}
213+
}
214+
DeliveryStrategy::TimedDelivery(duration) => {
215+
let res = recipient
216+
.tell(message.clone())
217+
.mailbox_timeout(duration)
218+
.await;
219+
if let Err(SendError::ActorNotRunning(_)) = res {
220+
to_remove.push((pattern.clone(), recipient.id()));
221+
}
222+
}
223+
DeliveryStrategy::Spawned => {
224+
let pattern = pattern.clone();
225+
let recipient = recipient.clone();
226+
let message = message.clone();
227+
let broker_ref = ctx.actor_ref();
228+
tokio::spawn(async move {
229+
let res = recipient.tell(message).send().await;
230+
if let Err(SendError::ActorNotRunning(_)) = res {
231+
let _ = broker_ref
232+
.tell(Unsubscribe {
233+
topic: Some(pattern),
234+
actor_id: recipient.id(),
235+
})
236+
.await;
237+
}
238+
});
239+
}
240+
DeliveryStrategy::SpawnedWithTimeout(duration) => {
241+
let pattern = pattern.clone();
242+
let recipient = recipient.clone();
243+
let message = message.clone();
244+
let broker_ref = ctx.actor_ref();
245+
tokio::spawn(async move {
246+
let res = recipient
247+
.tell(message)
248+
.mailbox_timeout(duration)
249+
.send()
250+
.await;
251+
if let Err(SendError::ActorNotRunning(_)) = res {
252+
let _ = broker_ref
253+
.tell(Unsubscribe {
254+
topic: Some(pattern),
255+
actor_id: recipient.id(),
256+
})
257+
.await;
258+
}
259+
});
260+
}
261+
}
262+
}
263+
}
264+
}
265+
266+
for (pattern, actor_id) in to_remove {
267+
self.unsubscribe(&pattern, actor_id);
268+
}
269+
}
270+
}
271+
272+
/// Strategies for delivering messages to subscribers.
273+
///
274+
/// Different strategies provide different trade-offs between reliability,
275+
/// performance, and resource usage.
276+
#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Eq)]
277+
pub enum DeliveryStrategy {
278+
/// Block until all messages are delivered.
279+
///
280+
/// This strategy ensures reliable delivery but may cause the broker
281+
/// to block if any recipient's mailbox is full.
282+
Guaranteed,
283+
284+
/// Skip actors with full mailboxes.
285+
///
286+
/// This strategy attempts to deliver messages immediately without blocking,
287+
/// but will skip recipients whose mailboxes are full.
288+
#[default]
289+
BestEffort,
290+
291+
/// Try to deliver with timeout (blocks the publisher).
292+
///
293+
/// This strategy waits for each recipient to accept the message, but only
294+
/// up to the specified timeout duration. The broker will block during delivery.
295+
TimedDelivery(Duration),
296+
297+
/// Spawn a task for each delivery (non-blocking).
298+
///
299+
/// This strategy spawns a separate task for each message delivery,
300+
/// allowing the broker to continue processing other messages immediately.
301+
/// Tasks will retry indefinitely if mailboxes are full.
302+
Spawned,
303+
304+
/// Spawn a task with timeout for each delivery.
305+
///
306+
/// This strategy combines the benefits of spawned delivery with a timeout,
307+
/// ensuring that delivery attempts don't consume resources indefinitely.
308+
SpawnedWithTimeout(Duration),
309+
}

actors/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
//!
33
//! This crate provides reusable actor components:
44
//!
5+
//! - `broker`: Topic-based message broker
56
//! - `pool`: Actor pool for managing concurrent task execution
67
//! - `pubsub`: Publish-subscribe pattern implementation for actor communication
78
9+
pub mod broker;
810
pub mod pool;
911
pub mod pubsub;

actors/src/pool.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//!
1919
//! ```
2020
//! use kameo::Actor;
21-
//! use kameo::actor::pool::{ActorPool, WorkerMsg, BroadcastMsg};
21+
//! use kameo_actors::pool::{ActorPool, Broadcast, Dispatch};
2222
//! # use kameo::message::{Context, Message};
2323
//!
2424
//! #[derive(Actor)]
@@ -34,8 +34,8 @@
3434
//! let pool_actor = kameo::spawn(ActorPool::new(4, || kameo::spawn(MyWorker)));
3535
//!
3636
//! // Send tasks to the pool
37-
//! pool_actor.tell(WorkerMsg("Hello worker!")).await?;
38-
//! pool_actor.tell(BroadcastMsg("Hello all workers!")).await?;
37+
//! pool_actor.tell(Dispatch("Hello worker!")).await?;
38+
//! pool_actor.tell(Broadcast("Hello all workers!")).await?;
3939
//! # Ok::<(), Box<dyn std::error::Error>>(())
4040
//! # });
4141
//! ```

actors/src/pubsub.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//!
1919
//! ```
2020
//! use kameo::Actor;
21-
//! use kameo::actor::pubsub::{PubSub, Publish, Subscribe};
21+
//! use kameo_actors::pubsub::{PubSub, Publish, Subscribe};
2222
//! # use kameo::message::{Context, Message};
2323
//!
2424
//! #[derive(Actor)]
@@ -82,7 +82,7 @@ impl<M> PubSub<M> {
8282
/// # Example
8383
///
8484
/// ```
85-
/// use kameo::actor::pubsub::PubSub;
85+
/// use kameo_actors::pubsub::PubSub;
8686
///
8787
/// #[derive(Clone)]
8888
/// struct Msg(String);
@@ -130,7 +130,7 @@ impl<M> PubSub<M> {
130130
///
131131
/// ```
132132
/// # use kameo::Actor;
133-
/// use kameo::actor::pubsub::PubSub;
133+
/// use kameo_actors::pubsub::PubSub;
134134
/// # use kameo::message::{Context, Message};
135135
///
136136
/// # #[derive(Actor)]
@@ -172,7 +172,7 @@ impl<M> PubSub<M> {
172172
///
173173
/// ```
174174
/// # use kameo::Actor;
175-
/// use kameo::actor::pubsub::PubSub;
175+
/// use kameo_actors::pubsub::PubSub;
176176
/// # use kameo::message::{Context, Message};
177177
///
178178
/// # #[derive(Actor)]

0 commit comments

Comments
 (0)