- Introduction
- Prerequisites
- Installation
- Usage
- Message Processing and Performance Tuning
- Configurable Thread Pools
- Mailbox Configuration
- Request-Response with Ask Pattern
- Error Handling and Supervision Strategy
- Stateful Actors and Persistence
- Backpressure Support in Actors
- Cluster Mode
- Feature Roadmap
Cajun is a lightweight, high-performance actor system for Java applications that leverages modern Java features to provide a simple yet powerful concurrency model. It's designed to make concurrent programming easier and more reliable by using the actor model.
An actor is a concurrent unit of computation which guarantees serial processing of messages with no need for state synchronization and coordination. This guarantee of actors mainly comes from the way actors communicate with each other, each actor sends asynchronous messages to other actors and each actor only reads messages from its mailbox.
Key benefits of using Cajun:
- Simplified Concurrency: No locks, no synchronized blocks, no race conditions
- Scalability: Easily scale from single-threaded to multi-threaded to distributed systems
- Fault Tolerance: Built-in supervision strategies for handling failures
- Flexibility: Multiple programming styles (OO, functional, stateful)
- Performance: High-throughput message processing with batching support
- Configurable Threading: Per-actor thread pool configuration with workload optimization presets
Dedication: Cajun is inspired by Erlang OTP and the actor model, and is dedicated to the late Joe Armstrong from Ericsson, whose pioneering work on Erlang and the actor model has influenced a generation of concurrent programming systems. Additional inspiration comes from Akka/Pekko.
- Java 21+ (with --enable-preview flag)
Add Cajun to your project using Gradle:
dependencies {
implementation 'systems.cajun:cajun-core:latest.release'
}
Or with Maven:
<dependency>
<groupId>systems.cajun</groupId>
<artifactId>cajun-core</artifactId>
<version>latest.release</version>
</dependency>
Cajun provides a clean, interface-based approach for creating actors. This approach separates the message handling logic from the actor lifecycle management, making your code more maintainable and testable.
For stateless actors, implement the Handler<Message>
interface:
public sealed interface GreetingMessage permits HelloMessage, ByeMessage, GetHelloCount, Shutdown {
}
public record HelloMessage() implements GreetingMessage {
}
public record ByeMessage() implements GreetingMessage {
}
public record Shutdown() implements GreetingMessage {
}
public record GetHelloCount(Pid replyTo) implements GreetingMessage {
}
public record HelloCount(int count) {
}
public class GreetingHandler implements Handler<GreetingMessage> {
private int helloCount = 0;
@Override
public void receive(GreetingMessage message, ActorContext context) {
switch (message) {
case HelloMessage ignored -> {
// Updating state of the handler
helloCount++;
}
case GetHelloCount ghc -> {
// Replying back to calling actor
context.tell(ghc.replyTo(), new HelloCount(helloCount));
}
case ByeMessage ignored -> {
// Sending a message to self
context.tellSelf(new Shutdown());
}
case Shutdown ignored -> {
// Stopping actor
context.stop();
}
}
}
// Optional lifecycle methods
@Override
public void preStart(ActorContext context) {
// Initialization logic
}
@Override
public void postStop(ActorContext context) {
// Cleanup logic
}
@Override
public boolean onError(GreetingMessage message, Throwable exception, ActorContext context) {
// Custom error handling
return false; // Return true to reprocess the message
}
}
Create and start the actor:
// Create an actor with a handler class (instantiated automatically)
Pid actorPid = system.actorOf(GreetingHandler.class)
.withId("greeter-1") // Optional: specify ID (otherwise auto-generated)
.spawn();
// Or create with a handler instance
GreetingHandler handler = new GreetingHandler();
Pid actorPid = system.actorOf(handler).spawn();
// Send messages
actorPid.tell(new HelloMessage());
For actors that need to maintain and persist state, implement the StatefulHandler<State, Message>
interface:
public class CounterHandler implements StatefulHandler<Integer, CounterMessage> {
@Override
public Integer receive(CounterMessage message, Integer state, ActorContext context) {
return switch (message) {
case Increment ignored -> state + 1;
case Decrement ignored -> state - 1;
case Reset ignored -> 0;
case GetCount gc -> {
context.tell(gc.replyTo(), new CountResult(state));
yield state; // Return unchanged state
}
};
}
@Override
public Integer preStart(Integer state, ActorContext context) {
// Optional initialization logic
return state;
}
@Override
public void postStop(Integer state, ActorContext context) {
// Optional cleanup logic
}
@Override
public boolean onError(CounterMessage message, Integer state, Throwable exception, ActorContext context) {
// Custom error handling
return false; // Return true to reprocess the message
}
}
Create and start the stateful actor:
// Create a stateful actor with a handler class and initial state
Pid counterPid = system.statefulActorOf(CounterHandler.class, 0)
.withId("counter-1") // Optional: specify ID (otherwise auto-generated)
.spawn();
// Or create with a handler instance
CounterHandler handler = new CounterHandler();
Pid counterPid = system.statefulActorOf(handler, 0).spawn();
// Send messages
counterPid.tell(new Increment());
Both actor builders support additional configuration options:
// Configure backpressure, mailbox, and persistence
Pid actorPid = system.actorOf(GreetingHandler.class)
.withBackpressureConfig(new BackpressureConfig())
.withMailboxConfig(new ResizableMailboxConfig())
.spawn();
// Configure stateful actor with persistence
Pid counterPid = system.statefulActorOf(CounterHandler.class, 0)
.withPersistence(
PersistenceFactory.createBatchedFileMessageJournal(),
PersistenceFactory.createFileSnapshotStore()
)
.spawn();
// Configure actor with custom thread pool for CPU-intensive work
ThreadPoolFactory cpuFactory = new ThreadPoolFactory()
.optimizeFor(ThreadPoolFactory.WorkloadType.CPU_BOUND);
Pid computeActor = system.actorOf(ComputationHandler.class)
.withThreadPoolFactory(cpuFactory)
.spawn();
You can create parent-child relationships between actors:
// Create a parent actor
Pid parentPid = system.actorOf(ParentHandler.class).spawn();
// Create a child actor through the parent
Pid childPid = system.actorOf(ChildHandler.class)
.withParent(system.getActor(parentPid))
.spawn();
// Or create a child directly from another handler
public class ParentHandler implements Handler<ParentMessage> {
@Override
public void receive(ParentMessage message, ActorContext context) {
if (message instanceof CreateChild) {
// Create a child actor
Pid childPid = context.createChild(ChildHandler.class, "child-1");
// Send message to the child
context.tell(childPid, new ChildMessage());
}
}
}
- Persistent State: State is automatically persisted using configurable storage backends
- State Recovery: Automatically recovers state when an actor restarts
- Type Safety: Generic type parameters for both state and message types
- Pluggable Storage: Supports different state storage implementations:
- In-memory storage (default)
- File-based storage
- Custom storage implementations
- Configuring State Persistence
// Create a stateful actor with file-based persistence
Pid counterPid = system.statefulActorOf(CounterHandler.class, 0)
.withPersistence(
PersistenceFactory.createFileSnapshotStore("/path/to/snapshots"),
PersistenceFactory.createBatchedFileMessageJournal("/path/to/journal")
)
.spawn();
- State Recovery Options
// Configure recovery options
Pid counterPid = system.statefulActorOf(CounterHandler.class, 0)
.withRecoveryConfig(RecoveryConfig.builder()
.withRecoveryStrategy(RecoveryStrategy.SNAPSHOT_THEN_JOURNAL)
.withMaxMessagesToRecover(1000)
.build())
.spawn();
- Snapshot Strategies
// Configure snapshot strategy
Pid counterPid = system.statefulActorOf(CounterHandler.class, 0)
.withSnapshotStrategy(SnapshotStrategy.builder()
.withInterval(Duration.ofMinutes(5))
.withThreshold(100) // Take snapshot every 100 state changes
.build())
.spawn();
The StatefulHandler
interface provides lifecycle methods:
preStart(State state, ActorContext context)
: Called when the actor starts, returns the initial statepostStop(State state, ActorContext context)
: Called when the actor stopsonError(Message message, State state, Throwable exception, ActorContext context)
: Called when message processing fails
After creating your handlers, use the actor system to spawn actors and send messages:
public class CountResultHandler implements Handler<HelloCount> {
@Override
public void receive(HelloCount message, ActorContext context) {
System.out.println("Count: " + message.count());
}
}
public static void main(String[] args) {
// Create the actor system
var actorSystem = new ActorSystem();
// Create a greeting actor
var greetingPid = actorSystem.actorOf(GreetingHandler.class)
.withId("greeting-actor-1")
.spawn();
// Create a receiver actor
var receiverPid = actorSystem.actorOf(CountResultHandler.class)
.withId("count-receiver")
.spawn();
// Send messages
greetingPid.tell(new HelloMessage());
greetingPid.tell(new GetHelloCount(receiverPid)); // Will print "Count: 1"
}
To run examples in the project, you can leverage the gradle task runner (--enable-preview flag is already enabled for gradle tasks)
./gradlew -PmainClass=examples.TimedCounter run
Cajun supports batched processing of messages to improve throughput:
- By default, each actor processes messages in batches of 10 messages at a time
- Batch processing can significantly improve throughput by reducing context switching overhead
- You can configure the batch size for any actor using the
withBatchSize()
method
// Create an actor with custom batch size
var myActor = actorSystem.register(MyActor.class, "my-actor");
((MyActor)actorSystem.getActor(myActor)).withBatchSize(50); // Process 50 messages at a time
- Larger batch sizes: Improve throughput but may increase latency for individual messages
- Smaller batch sizes: Provide more responsive processing but with lower overall throughput
- Workload characteristics: CPU-bound tasks benefit from larger batches, while I/O-bound tasks may work better with smaller batches
- Memory usage: Larger batches consume more memory as messages are held in memory during processing
The project includes performance tests that can help you evaluate different configurations:
# Run all performance tests
./gradlew test -PincludeTags="performance"
# Run a specific performance test
./gradlew test --tests "systems.cajun.performance.ActorPerformanceTest.testActorChainThroughput"
The performance tests measure:
- Actor Chain Throughput: Tests message passing through a chain of actors
- Many-to-One Throughput: Tests many sender actors sending to a single receiver
- Actor Lifecycle Performance: Tests creation and stopping of large numbers of actors
Cajun provides flexible thread pool configuration for actors, allowing you to optimize performance based on your workload characteristics. Each actor can be configured with its own ThreadPoolFactory, or use the system default (virtual threads).
Cajun supports multiple thread pool strategies:
- VIRTUAL: Uses Java 21 virtual threads for high concurrency with low overhead (default)
- FIXED: Uses a fixed-size platform thread pool for predictable resource usage
- WORK_STEALING: Uses a work-stealing thread pool for balanced workloads
// Optimize for I/O-bound operations (high concurrency, virtual threads)
ThreadPoolFactory ioOptimized = new ThreadPoolFactory()
.optimizeFor(ThreadPoolFactory.WorkloadType.IO_BOUND);
// Optimize for CPU-bound operations (fixed thread pool, platform threads)
ThreadPoolFactory cpuOptimized = new ThreadPoolFactory()
.optimizeFor(ThreadPoolFactory.WorkloadType.CPU_BOUND);
// Optimize for mixed workloads (work-stealing pool)
ThreadPoolFactory mixedOptimized = new ThreadPoolFactory()
.optimizeFor(ThreadPoolFactory.WorkloadType.MIXED);
// Create actors with optimized thread pools
Pid networkActor = system.actorOf(NetworkHandler.class)
.withId("network-processor")
.withThreadPoolFactory(ioOptimized)
.spawn();
Pid computeActor = system.actorOf(ComputationHandler.class)
.withId("compute-processor")
.withThreadPoolFactory(cpuOptimized)
.spawn();
// Stateful actors also support custom thread pools
Pid statefulActor = system.statefulActorOf(StateHandler.class, initialState)
.withId("stateful-processor")
.withThreadPoolFactory(mixedOptimized)
.spawn();
For fine-grained control, you can create custom ThreadPoolFactory configurations:
// Custom configuration for specific requirements
ThreadPoolFactory customFactory = new ThreadPoolFactory()
.setExecutorType(ThreadPoolFactory.ThreadPoolType.FIXED)
.setFixedPoolSize(8) // 8 platform threads
.setPreferVirtualThreads(false)
.setUseNamedThreads(true);
Pid customActor = system.actorOf(MyHandler.class)
.withThreadPoolFactory(customFactory)
.spawn();
- Best for: Network I/O, file operations, database calls
- Characteristics: Extremely lightweight, high concurrency (millions of threads)
- Use when: You have many actors doing I/O operations
- Best for: CPU-intensive computations, mathematical operations
- Characteristics: Predictable resource usage, optimal for CPU-bound work
- Use when: You have fewer actors doing intensive computation
- Best for: Mixed I/O and CPU workloads
- Characteristics: Dynamic load balancing, good for varied workloads
- Use when: Your actors have unpredictable or mixed workload patterns
- Default behavior: If no ThreadPoolFactory is specified, actors use virtual threads
- Per-actor configuration: Different actors can use different thread pool strategies
- Resource isolation: Custom thread pools provide isolation between different types of work
- Monitoring: Thread pools can be monitored and tuned based on application metrics
Actors in Cajun process messages from their mailboxes. The system provides flexibility in how these mailboxes are configured, affecting performance, resource usage, and backpressure behavior.
By default, if no specific mailbox configuration is provided when an actor is created, the ActorSystem
will use its default MailboxProvider
and default MailboxConfig
. Typically, this results in:
- A
LinkedBlockingQueue
with a default capacity (e.g., 10,000 messages). This is suitable for general-purpose actors, especially those that might perform I/O operations or benefit from the unbounded nature (up to system memory) ofLinkedBlockingQueue
when paired with virtual threads.
The exact default behavior can be influenced by the system-wide MailboxProvider
configured in the ActorSystem
.
You can customize the mailbox for an actor in several ways:
-
Using
MailboxConfig
during Actor Creation: When creating an actor using theActorSystem.actorOf(...)
builder pattern, you can provide a specificMailboxConfig
orResizableMailboxConfig
:// Example: Using a ResizableMailboxConfig for an actor ResizableMailboxConfig customMailboxConfig = new ResizableMailboxConfig( 100, // Initial capacity 1000, // Max capacity 50, // Min capacity (for shrinking) 0.8, // Resize threshold (e.g., grow at 80% full) 2.0, // Resize factor (e.g., double the size) 0.2, // Shrink threshold (e.g., shrink at 20% full) 0.5 // Shrink factor (e.g., halve the size) ); Pid myActor = system.actorOf(MyHandler.class) .withMailboxConfig(customMailboxConfig) .spawn();
If you provide a
ResizableMailboxConfig
, theDefaultMailboxProvider
will typically create aResizableBlockingQueue
for that actor, allowing its mailbox to dynamically adjust its size based on load. OtherMailboxConfig
types might result in different queue implementations based on the provider's logic. -
Providing a Custom
MailboxProvider
to theActorSystem
: For system-wide changes or more complex mailbox selection logic, you can implement theMailboxProvider
interface and configure yourActorSystem
instance to use it.// 1. Implement your custom MailboxProvider public class MyCustomMailboxProvider<M> implements MailboxProvider<M> { @Override public BlockingQueue<M> createMailbox(MailboxConfig config, ThreadPoolFactory.WorkloadType workloadTypeHint) { if (config instanceof MySpecialConfig) { // return new MySpecialQueue<>(); } // Fallback to default logic or other custom queues return new DefaultMailboxProvider<M>().createMailbox(config, workloadTypeHint); // Assuming DefaultMailboxProvider has a no-arg constructor or a way to get a default instance } } // 2. Configure ActorSystem to use it ActorSystem system = ActorSystem.create("my-system") .withMailboxProvider(new MyCustomMailboxProvider<>()) // Provide an instance of your custom provider .build();
When actors are created within this system, your
MyCustomMailboxProvider
will be called to create their mailboxes, unless an actor explicitly overrides it via its own builder methods (which might also accept aMailboxProvider
instance for per-actor override).
By understanding and utilizing these configuration options, you can fine-tune mailbox behavior to match the specific needs of your actors and the overall performance characteristics of your application.
While actors typically communicate through one-way asynchronous messages, Cajun provides an "ask pattern" for request-response interactions where you need to wait for a reply.
The ask pattern allows you to send a message to an actor and receive a response as a CompletableFuture
:
// Send a request to an actor and get a future response
CompletableFuture<String> future = actorSystem.ask(
targetActorPid, // The actor to ask
"ping", // The message to send
Duration.ofSeconds(3) // Timeout for the response
);
// Process the response when it arrives
future.thenAccept(response -> {
System.out.println("Received response: " + response);
});
// Or wait for the response (blocking)
try {
String response = future.get(5, TimeUnit.SECONDS);
System.out.println("Received response: " + response);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
System.err.println("Error getting response: " + e.getMessage());
}
Actors that respond to ask requests must handle the special AskPayload
wrapper message:
public class ResponderActor extends Actor<ActorSystem.AskPayload<String>> {
public ResponderActor(ActorSystem system, String actorId) {
super(system, actorId);
}
@Override
protected void receive(ActorSystem.AskPayload<String> payload) {
// Extract the original message
String message = payload.message();
// Process the message
String response = processMessage(message);
// Send the response back to the temporary reply actor
payload.replyTo().tell(response);
}
private String processMessage(String message) {
if ("ping".equals(message)) {
return "pong";
}
return "unknown command";
}
}
The ask pattern includes robust error handling to manage various failure scenarios:
-
Timeout Handling: If no response is received within the specified timeout, the future completes exceptionally with a
TimeoutException
. -
Type Mismatch Handling: If the response type doesn't match the expected type, the future completes exceptionally with a wrapped
ClassCastException
. -
Actor Failure Handling: If the target actor fails while processing the message, the error is propagated to the future based on the actor's supervision strategy.
try {
String response = actorSystem.ask(actorPid, message, Duration.ofSeconds(2)).get();
// Process successful response
} catch (ExecutionException ex) {
Throwable cause = ex.getCause();
if (cause instanceof TimeoutException) {
// Handle timeout
} else if (cause instanceof RuntimeException && cause.getCause() instanceof ClassCastException) {
// Handle type mismatch
} else {
// Handle other errors
}
} catch (InterruptedException e) {
// Handle interruption
}
Internally, the ask pattern works by:
- Creating a temporary actor to receive the response
- Wrapping the original message in an
AskPayload
that includes the temporary actor's PID - Sending the wrapped message to the target actor
- Setting up a timeout to complete the future exceptionally if no response arrives in time
- Completing the future when the temporary actor receives a response
This implementation ensures that resources are properly cleaned up, even in failure scenarios, by automatically stopping the temporary actor after processing the response or timeout.
Cajun provides a robust error handling system with supervision strategies inspired by Erlang OTP. This allows actors to recover from failures gracefully without crashing the entire system.
The SupervisionStrategy
enum defines how an actor should respond to failures:
- RESUME: Continue processing messages, ignoring the error (best for non-critical errors)
- RESTART: Restart the actor, resetting its state (good for recoverable errors)
- STOP: Stop the actor completely (for unrecoverable errors)
- ESCALATE: Escalate the error to the parent actor (for system-wide issues)
// Configure an actor with a specific supervision strategy
MyActor actor = new MyActor(system, "my-actor");
actor.withSupervisionStrategy(SupervisionStrategy.RESTART);
// Method chaining for configuration
MyActor actor = new MyActor(system, "my-actor")
.withSupervisionStrategy(SupervisionStrategy.RESTART)
.withErrorHook(ex -> logger.error("Actor error", ex));
Actors provide lifecycle hooks that are called during error handling and recovery:
- preStart(): Called before the actor starts processing messages
- postStop(): Called when the actor is stopped
- onError(Throwable): Called when an error occurs during message processing
public class ResilientActor extends Actor<String> {
public ResilientActor(ActorSystem system, String actorId) {
super(system, actorId);
}
@Override
protected void preStart() {
// Initialize resources
logger.info("Actor starting: {}", self().id());
}
@Override
protected void postStop() {
// Clean up resources
logger.info("Actor stopping: {}", self().id());
}
@Override
protected void onError(Throwable error) {
// Custom error handling
logger.error("Error in actor: {}", self().id(), error);
}
@Override
protected void receive(String message) {
// Message processing logic
}
}
The handleException
method provides centralized error management:
@Override
protected SupervisionStrategy handleException(Throwable exception) {
if (exception instanceof TemporaryException) {
// Log and continue for temporary issues
logger.warn("Temporary error, resuming", exception);
return SupervisionStrategy.RESUME;
} else if (exception instanceof RecoverableException) {
// Restart for recoverable errors
logger.error("Recoverable error, restarting", exception);
return SupervisionStrategy.RESTART;
} else {
// Stop for critical errors
logger.error("Critical error, stopping", exception);
return SupervisionStrategy.STOP;
}
}
The error handling system integrates seamlessly with the ask pattern, propagating exceptions to the future:
try {
// If the actor throws an exception while processing this message,
// it will be propagated to the future based on the supervision strategy
String result = actorSystem.ask(actorPid, "risky-operation", Duration.ofSeconds(5)).get();
System.out.println("Success: " + result);
} catch (ExecutionException ex) {
// The original exception is wrapped in an ExecutionException
Throwable cause = ex.getCause();
System.err.println("Actor error: " + cause.getMessage());
}
Cajun integrates with SLF4J and Logback for comprehensive logging:
// Configure logging in your application
private static final Logger logger = LoggerFactory.getLogger(MyActor.class);
// Errors are automatically logged with appropriate levels
@Override
protected void receive(Message msg) {
try {
// Process message
} catch (Exception e) {
// This will be logged and handled according to the supervision strategy
throw new ActorException("Failed to process message", e);
}
}
Cajun provides a StatefulActor
class that maintains and persists its state. This is useful for actors that need to maintain state across restarts or system failures.
Stateful actors can persist their state to disk or other storage backends. This allows actors to recover their state after a restart or crash.
// Define a stateful handler for the counter
public class CounterHandler implements StatefulHandler<Integer, CounterMessage> {
@Override
public Integer processMessage(Integer count, CounterMessage message) {
if (message instanceof IncrementMessage) {
return count + 1;
} else if (message instanceof GetCountMessage getCountMsg) {
// Send the current count back to the sender
getCountMsg.getSender().tell(count);
return count;
}
return count;
}
}
// Create a stateful actor with an initial state using the handler pattern
Pid counterPid = system.statefulActor("counter", 0, new CounterHandler());
// Send messages to the actor
counterPid.tell(new IncrementMessage());
counterPid.tell(new GetCountMessage(myPid));
Cajun supports message persistence and replay for stateful actors using a Write-Ahead Log (WAL) style approach. This enables actors to recover their state by replaying messages after a restart or crash.
- Message Journaling: All messages are logged to a journal before processing
- State Snapshots: Periodic snapshots of actor state are taken to speed up recovery
- Hybrid Recovery: Uses latest snapshot plus replay of subsequent messages
- Pluggable Persistence: Swap out persistence implementations without changing actor code
- Provider Pattern: Configure system-wide persistence strategy with ease
// Define a stateful handler with custom persistence (legacy approach)
public class MyStatefulHandler implements StatefulHandler<MyState, MyMessage> {
@Override
public MyState processMessage(MyState state, MyMessage message) {
// Process the message and return the new state
return newState;
}
}
// Create the actor using the stateful handler
Pid actorPid = system.statefulActor(
"my-actor",
initialState,
new MyStatefulHandler(),
PersistenceFactory.createBatchedFileMessageJournal(),
PersistenceFactory.createFileSnapshotStore()
);
Cajun now supports a provider pattern for persistence, allowing you to swap out persistence implementations at runtime without changing your actor code:
// Register a custom persistence provider for the entire actor system
PersistenceProvider customProvider = new CustomPersistenceProvider();
ActorSystemPersistenceHelper.setPersistenceProvider(actorSystem, customProvider);
// Or use the fluent API
ActorSystemPersistenceHelper.persistence(actorSystem)
.withPersistenceProvider(customProvider);
// Create stateful actors using the handler pattern with the configured provider
// No need to specify persistence components explicitly
public class MyStatefulHandler implements StatefulHandler<MyState, MyMessage> {
@Override
public MyState processMessage(MyState state, MyMessage message) {
// Process the message and return the new state
return newState;
}
}
// The system will use the configured persistence provider automatically
Pid actorPid = system.statefulActor("my-actor", initialState, new MyStatefulHandler());
Implement the PersistenceProvider
interface to create custom persistence backends:
public class CustomPersistenceProvider implements PersistenceProvider {
@Override
public <M> BatchedMessageJournal<M> createBatchedMessageJournal(String actorId) {
// Implement custom message journaling
return new CustomBatchedMessageJournal<>(actorId);
}
@Override
public <S> SnapshotStore<S> createSnapshotStore(String actorId) {
// Implement custom state snapshot storage
return new CustomSnapshotStore<>(actorId);
}
// Implement other required methods
}
The actor system uses FileSystemPersistenceProvider
by default if no custom provider is specified.
The StatefulActor implements a robust recovery mechanism that ensures state consistency after system restarts or failures:
-
Initialization Process:
- On startup, the actor attempts to load the most recent snapshot
- If a snapshot exists, it restores the state from that snapshot
- It then replays any messages received after the snapshot was taken
- If no snapshot exists, it initializes with the provided initial state
-
Explicit State Initialization:
// Force initialization and wait for it to complete (useful in tests) statefulActor.forceInitializeState().join(); // Or with timeout boolean initialized = statefulActor.waitForStateInitialization(1000);
-
Handling Null States:
- StatefulActor properly handles null initial states for recovery cases
- State can be null during initialization and will be properly recovered if snapshots exist
StatefulActor implements an adaptive snapshot strategy to balance performance and recovery speed:
// Configure snapshot strategy (time-based and change-count-based)
statefulActor.configureSnapshotStrategy(
30000, // Take snapshot every 30 seconds
500 // Or after 500 state changes, whichever comes first
);
// Force an immediate snapshot
statefulActor.forceSnapshot().join();
Key snapshot features:
- Time-based snapshots: Automatically taken after a configurable time interval
- Change-based snapshots: Taken after a certain number of state changes
- Dedicated thread pool: Snapshots are taken asynchronously to avoid blocking the actor
- Final snapshots: A snapshot is automatically taken when the actor stops
Cajun features a robust backpressure system to help actors manage high load scenarios effectively. Backpressure is an opt-in feature, configured using BackpressureConfig
objects.
Backpressure can be configured at the ActorSystem
level, which then applies to actors by default, or dynamically for individual actors if specific settings are needed.
To enable and configure backpressure for all actors by default within an ActorSystem
, provide a BackpressureConfig
object during its creation. Actors created within this system will inherit this configuration. If no BackpressureConfig
is supplied to the ActorSystem
, backpressure is disabled by default for the system.
Example:
// Define backpressure settings using BackpressureConfig
BackpressureConfig systemBpConfig = new BackpressureConfig()
.setStrategy(BackpressureStrategy.BLOCK) // Default strategy
.setWarningThreshold(0.7f) // 70% mailbox capacity
.setCriticalThreshold(0.9f) // 90% mailbox capacity
.setRecoveryThreshold(0.5f); // 50% mailbox capacity
// Create ActorSystem with this configuration
// This also requires a ThreadPoolFactory
ActorSystem system = new ActorSystem(new ThreadPoolFactory(), systemBpConfig);
// Actors created in this system will now use these backpressure settings by default.
Actors primarily inherit their backpressure configuration from the ActorSystem
they belong to. If you need to customize backpressure settings for a specific actor (e.g., use a different strategy or thresholds than the system default, or enable it if the system has it disabled), you can do so dynamically after the actor has been created using the BackpressureBuilder
. See the "Dynamically Managing Backpressure" section for details.
If an actor is part of an ActorSystem
that has backpressure disabled (no BackpressureConfig
provided to the system), backpressure will also be disabled for that actor by default. It can then be enabled and configured specifically for that actor using the BackpressureBuilder
.
The backpressure system operates with four distinct states:
- NORMAL: The actor is operating with sufficient capacity
- WARNING: The actor is approaching capacity limits but not yet applying backpressure
- CRITICAL: The actor is at or above its high watermark and actively applying backpressure
- RECOVERY: The actor was recently in a CRITICAL state but is now recovering (below high watermark but still above low watermark)
Cajun supports multiple strategies for handling backpressure:
- BLOCK: Block the sender until space is available in the mailbox (default behavior)
- DROP_NEW: Drop new messages when the mailbox is full, prioritizing older messages
- DROP_OLDEST: Remove oldest messages from the mailbox using the direct Actor.dropOldestMessage method
- CUSTOM: Use a custom strategy by implementing a
CustomBackpressureHandler
While BackpressureConfig
sets the initial backpressure configuration (either system-wide or for an actor at creation), the BackpressureBuilder
allows for dynamic adjustments to an actor's backpressure settings after it has been created. This is useful for overriding system defaults for a specific actor, or for enabling and configuring backpressure for an actor if its ActorSystem
has backpressure disabled by default.
// Direct actor configuration with type safety
BackpressureBuilder<MyMessage> builder = new BackpressureBuilder<>(myActor)
.withStrategy(BackpressureStrategy.DROP_OLDEST)
.withWarningThreshold(0.7f)
.withCriticalThreshold(0.9f)
.withRecoveryThreshold(0.5f);
// Apply the configuration
builder.apply();
// PID-based configuration through ActorSystem
BackpressureBuilder<MyMessage> builder = system.getBackpressureMonitor()
.configureBackpressure(actorPid)
.withStrategy(BackpressureStrategy.DROP_OLDEST)
.withWarningThreshold(0.7f)
.withCriticalThreshold(0.9f);
builder.apply();
// Using preset configurations for common scenarios
BackpressureBuilder<MyMessage> timeCriticalBuilder = new BackpressureBuilder<>(myActor)
.presetTimeCritical()
.apply();
BackpressureBuilder<MyMessage> reliableBuilder = new BackpressureBuilder<>(myActor)
.presetReliable()
.apply();
BackpressureBuilder<MyMessage> highThroughputBuilder = new BackpressureBuilder<>(myActor)
.presetHighThroughput()
.apply();
// Check backpressure status
BackpressureStatus status = actor.getBackpressureStatus();
BackpressureState currentState = status.getCurrentState();
float fillRatio = status.getFillRatio();
For advanced backpressure control, you can implement a custom handler and apply it using the BackpressureBuilder:
CustomBackpressureHandler<MyMessage> handler = new CustomBackpressureHandler<>() {
@Override
public boolean handleMessage(Actor<MyMessage> actor, MyMessage message, BackpressureSendOptions options) {
// Custom logic to decide whether to accept the message
if (message.isPriority()) {
return true; // Always accept priority messages
}
return actor.getCurrentSize() < actor.getCapacity() * 0.9;
}
@Override
public boolean makeRoom(Actor<MyMessage> actor) {
// Custom logic to make room in the mailbox
// Return true if room was successfully made
return actor.dropOldestMessage();
}
};
// Configure with custom handler
new BackpressureBuilder<>(myActor)
.withStrategy(BackpressureStrategy.CUSTOM)
.withCustomHandler(handler)
.apply();
The backpressure system provides monitoring capabilities and callback notifications through the BackpressureBuilder:
// Register for backpressure event notifications using the builder
new BackpressureBuilder<>(myActor)
.withStrategy(BackpressureStrategy.DROP_OLDEST)
.withWarningThreshold(0.7f)
.withCriticalThreshold(0.9f)
.withCallback(event -> {
logger.info("Backpressure event: {} state, fill ratio: {}",
event.getState(), event.getFillRatio());
// Take action based on backpressure events
if (event.isBackpressureActive()) {
// Notify monitoring system, scale resources, etc.
}
})
.apply();
// Access detailed backpressure metrics and history
BackpressureStatus status = actor.getBackpressureStatus();
List<BackpressureEvent> recentEvents = status.getRecentEvents();
List<StateTransition> stateTransitions = status.getStateTransitions();
// Monitor state transition history
for (StateTransition transition : stateTransitions) {
logger.debug("Transition from {} to {} at {} due to: {}",
transition.getFromState(),
transition.getToState(),
transition.getTimestamp(),
transition.getReason());
}
You can send messages with special options to control backpressure behavior:
// Create options for high priority messages that bypass backpressure
BackpressureSendOptions highPriority = new BackpressureSendOptions()
.setHighPriority(true)
.setTimeout(Duration.ofSeconds(5));
// Send with high priority
actor.tell(urgentMessage, highPriority);
// Or use the system to send with options
boolean accepted = system.tellWithOptions(actorPid, message, highPriority);
// Block until message is accepted or timeout occurs
BackpressureSendOptions blockingOptions = new BackpressureSendOptions()
.setBlockUntilAccepted(true)
.setTimeout(Duration.ofSeconds(3));
Cajun supports running in a distributed cluster mode, allowing actors to communicate across multiple nodes.
// Create a cluster configuration
ClusterConfig config = ClusterConfig.builder()
.nodeName("node1")
.port(2551)
.seedNodes(List.of("127.0.0.1:2551", "127.0.0.1:2552"))
.build();
// Create a clustered actor system
ActorSystem system = ActorSystem.createClustered(config);
// Create and register actors as usual
Pid actorPid = system.register(MyActor.class, "my-actor");
Messages can be sent to actors regardless of which node they're running on. The system automatically routes messages to the correct node.
// Send a message to an actor (works the same whether the actor is local or remote)
When a node fails, its actors are automatically reassigned to other nodes in the cluster.
// Node 1
MetadataStore metadataStore1 = new EtcdMetadataStore("http://etcd-host:2379");
DirectMessagingSystem messagingSystem1 = new DirectMessagingSystem("node1", 8080);
messagingSystem1.addNode("node2", "node2-host", 8080);
ClusterActorSystem system1 = new ClusterActorSystem("node1", metadataStore1, messagingSystem1);
system1.start().get();
// Node 2
MetadataStore metadataStore2 = new EtcdMetadataStore("http://etcd-host:2379");
DirectMessagingSystem messagingSystem2 = new DirectMessagingSystem("node2", 8080);
messagingSystem2.addNode("node1", "node1-host", 8080);
ClusterActorSystem system2 = new ClusterActorSystem("node2", metadataStore2, messagingSystem2);
system2.start().get();
For more details refer to Cluster Mode.
You can implement your own metadata store by implementing the MetadataStore
interface:
public class CustomMetadataStore implements MetadataStore {
// Implement the required methods
}
You can implement your own messaging system by implementing the MessagingSystem
interface:
public class CustomMessagingSystem implements MessagingSystem {
// Implement the required methods
}
For more details, see the Cluster Mode Improvements documentation.
-
Actor system and actor lifecycle
- Create Actor and Actor System
- Support message to self for actor
- Support hooks for start and shutdown of actor
- Stateful functional style actor
- Timed messages
- Error handling with supervision strategies
- Request-response pattern with ask functionality
- Robust exception handling and propagation
-
Actor metadata management with etcd
- Distributed metadata store with etcd support
- Leader election
- Actor assignment tracking
-
Actor supervision hierarchy and fault tolerance
- Basic supervision strategies (RESUME, RESTART, STOP, ESCALATE)
- Hierarchical supervision
- Custom supervision policies
- Lifecycle hooks (preStart, postStop, onError)
- Integrated logging with SLF4J and Logback
-
Persistent state and messaging for actors
- StatefulActor with persistent state management
- Pluggable state storage backends (in-memory, file-based)
- Message persistence and replay
- State initialization and recovery mechanisms
- Snapshot-based state persistence
- Hybrid recovery approach (snapshots + message replay)
- Explicit state initialization and force initialization methods
- Proper handling of null initial states for recovery cases
- Adaptive snapshot strategy with time-based and change-count-based options
- Customizable backends for snapshots and Write-Ahead Log (WAL)
- RocksDB backend for state persistence
- Segregation of runtime implementations (file store, in-memory store, etc.) from the actor system
-
Backpressure and load management
- Integrated backpressure support in StatefulActor
- Configurable mailbox capacity for backpressure control
- Load monitoring (queue size, processing times)
- Configurable retry mechanisms with exponential backoff
- Error recovery with custom error hooks
- Processing metrics and backpressure level monitoring
- Circuit breaker pattern implementation
- Rate limiting strategies
-
Partitioned state and sharding strategy
- Rendezvous hashing for actor assignment
-
Cluster mode
- Distributed actor systems
- Remote messaging between actor systems
- Actor reassignment on node failure
- Pluggable messaging system
- Configurable message delivery guarantees (EXACTLY_ONCE, AT_LEAST_ONCE, AT_MOST_ONCE)