diff --git a/Cargo.toml b/Cargo.toml index f5001656..2525e4e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ env_logger = "0.10.0" getrandom = "0.2" once_cell = "1.7" paste = "1.0.9" +rand = "0.8.5" reqwest = { version = "0.11.11", default-features = false, features = ["rustls-tls"] } tokio = { version = "1.19", features = ["fs", "io-util", "macros", "rt-multi-thread", "sync", "time" ] } @@ -158,6 +159,10 @@ required-features = ["sync"] name = "eviction_listener_sync" required-features = ["sync"] +[[example]] +name = "jittered_expiry_policy_sync" +required-features = ["sync"] + [[example]] name = "reinsert_expired_entries_sync" required-features = ["sync"] diff --git a/examples/README.md b/examples/README.md index 6a70696c..ad5fac69 100644 --- a/examples/README.md +++ b/examples/README.md @@ -60,6 +60,17 @@ existence of the entry. - Demonstrates when the expired entries will be actually evicted from the cache, and why the `run_pending_tasks` method could be important in some cases. +- [jittered_expiry_policy_sync](./jittered_expiry_policy_sync.rs) + - Implements a jittered expiry policy for a cache. + - The `JitteredExpiry` struct is a custom expiry policy that adds jitter to the + base expiry duration. + - It implements the `moka::Expiry` trait and calculates the expiry duration + after a write or read operation. + - The jitter is randomly generated and added to or subtracted from the base + expiry duration. + - This example uses the `moka::sync::Cache` type, but The same expiry policy can + be used with the `moka::future::Cache`. + - [cascading_drop_async](./cascading_drop_async.rs) - Controls the lifetime of the objects in a separate `BTreeMap` collection from the cache using an eviction listener. diff --git a/examples/jittered_expiry_policy_sync.rs b/examples/jittered_expiry_policy_sync.rs new file mode 100644 index 00000000..3701497a --- /dev/null +++ b/examples/jittered_expiry_policy_sync.rs @@ -0,0 +1,190 @@ +//! This example demonstrates how to implement a jittered expiry policy for a cache. +//! +//! The `JitteredExpiry` struct is a custom expiry policy that adds jitter to the +//! expiry duration. It implements the `moka::Expiry` trait and calculates the expiry +//! duration after a write or read operation. The jitter is randomly generated and +//! added to or subtracted from the base expiry duration. +//! +//! This example uses the `moka::sync::Cache` type, which is a synchronous cache. The +//! same expiry policy can be used with the asynchronous cache, `moka::future::Cache`. + +use std::time::{Duration, Instant}; + +use moka::{sync::Cache, Expiry}; +use rand::{ + distributions::{Distribution, Uniform}, + Rng, +}; + +/// A `moka::Expiry` implementation that adds jitter to the expiry duration. +pub struct JitteredExpiry { + /// Optional time-to-live duration. + time_to_live: Option, + /// Optional time-to-idle duration. + time_to_idle: Option, + /// The distribution to randomly generate the jitter. The jitter is added to + /// or subtracted from the expiry duration. + jitter_gen: J, +} + +impl JitteredExpiry +where + J: Distribution, +{ + pub fn new( + time_to_live: Option, + time_to_idle: Option, + jitter_gen: J, + ) -> Self { + Self { + time_to_live, + time_to_idle, + jitter_gen, + } + } + + /// Calculates the expiry duration after a write operation. + pub fn calc_expiry_for_write(&self) -> Option { + if matches!((self.time_to_live, self.time_to_idle), (None, None)) { + return None; + } + + let expiry = match (self.time_to_live, self.time_to_idle) { + (Some(ttl), None) => ttl, + (None, Some(tti)) => tti, + (Some(ttl), Some(tti)) => ttl.min(tti), + (None, None) => unreachable!(), + }; + + Some(self.add_jitter(expiry)) + } + + /// Calculates the expiry duration after a read operation. + pub fn calc_expiry_for_read(&self, read_at: Instant, modified_at: Instant) -> Option { + if matches!((self.time_to_live, self.time_to_idle), (None, None)) { + return None; + } + + let expiry = match (self.time_to_live, self.time_to_idle) { + (Some(ttl), None) => { + let elapsed = Self::elapsed_since_write(read_at, modified_at); + Self::remaining_to_ttl(ttl, elapsed) + } + (None, Some(tti)) => tti, + (Some(ttl), Some(tti)) => { + // Ensure that the expiry duration does not exceed the + // time-to-live since last write. + let elapsed = Self::elapsed_since_write(read_at, modified_at); + let remaining = Self::remaining_to_ttl(ttl, elapsed); + tti.min(remaining) + } + (None, None) => unreachable!(), + }; + + Some(self.add_jitter(expiry)) + } + + /// Calculates the elapsed time between `modified_at` and `read_at`. + fn elapsed_since_write(read_at: Instant, modified_at: Instant) -> Duration { + // NOTE: `duration_since` panics if `read_at` is earlier than `modified_at`. + if read_at >= modified_at { + read_at.duration_since(modified_at) + } else { + Duration::default() // zero duration + } + } + + /// Calculates the remaining time to live based on the `ttl` and `elapsed` time. + fn remaining_to_ttl(ttl: Duration, elapsed: Duration) -> Duration { + ttl.saturating_sub(elapsed) + } + + /// Adds jitter to the given duration. + fn add_jitter(&self, duration: Duration) -> Duration { + let mut rng = rand::thread_rng(); + let jitter = self.jitter_gen.sample(&mut rng); + + // Add or subtract the jitter to/from the duration. + if rng.gen() { + duration.saturating_add(jitter) + } else { + duration.saturating_sub(jitter) + } + } +} + +/// The implementation of the `moka::Expiry` trait for `JitteredExpiry`. +/// https://docs.rs/moka/latest/moka/policy/trait.Expiry.html +impl Expiry for JitteredExpiry +where + J: Distribution, +{ + /// Specifies that the entry should be automatically removed from the cache + /// once the duration has elapsed after the entry’s creation. This method is + /// called for cache write methods such as `insert` and `get_with` but only + /// when the key was not present in the cache. + fn expire_after_create(&self, _key: &K, _value: &V, _created_at: Instant) -> Option { + dbg!(self.calc_expiry_for_write()) + } + + /// Specifies that the entry should be automatically removed from the cache + /// once the duration has elapsed after the replacement of its value. This + /// method is called for cache write methods such as `insert` but only when + /// the key is already present in the cache. + fn expire_after_update( + &self, + _key: &K, + _value: &V, + _updated_at: Instant, + duration_until_expiry: Option, + ) -> Option { + dbg!(self.calc_expiry_for_write().or(duration_until_expiry)) + } + + /// Specifies that the entry should be automatically removed from the cache + /// once the duration has elapsed after its last read. This method is called + /// for cache read methods such as `get` and `get_with` but only when the + /// key is present in the cache. + fn expire_after_read( + &self, + _key: &K, + _value: &V, + read_at: Instant, + duration_until_expiry: Option, + last_modified_at: Instant, + ) -> Option { + dbg!(self + .calc_expiry_for_read(read_at, last_modified_at) + .or(duration_until_expiry)) + } +} + +fn main() { + let expiry = JitteredExpiry::new( + // TTL 10 minutes + Some(Duration::from_secs(10 * 60)), + // TTI 3 minutes + Some(Duration::from_secs(3 * 60)), + // Jitter +/- 30 seconds, 1 second resolution, uniformly distributed + Uniform::from(0..30).map(Duration::from_secs), + ); + + let cache = Cache::builder().expire_after(expiry).build(); + + const NUM_KEYS: usize = 10; + + // Insert some key-value pairs. + for key in 0..NUM_KEYS { + cache.insert(key, format!("value-{key}")); + } + + // Get all entries. + for key in 0..NUM_KEYS { + assert_eq!(cache.get(&key), Some(format!("value-{key}"))); + } + + // Update all entries. + for key in 0..NUM_KEYS { + cache.insert(key, format!("new-value-{key}")); + } +}