diff --git a/.vscode/settings.json b/.vscode/settings.json index 60c697f7..1833ac30 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,7 +10,7 @@ "Moka", "Ristretto", "Tatsuya", - "unsync", + "Upsert", "actix", "ahash", "benmanes", @@ -27,6 +27,7 @@ "semver", "structs", "toolchain", + "unsync", "usize" ] } \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 79cbb28b..f2996b09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,31 +1,48 @@ -# Moka — Release Notes +# Moka — Change Log -## Unreleased +## Version 0.3.0 -### Features +### Added -- Introduce an unsync cache. +- Add an unsync cache (`moka::unsync::Cache`) and its builder for single-thread + applications. ([#9][gh-pull-0009]) +- Add `invalidate_all` method to `sync`, `future` and `unsync` caches. + ([#11][gh-pull-0011]) + +### Fixed + +- Fix problems including segfault caused by race conditions between the sync/eviction + thread and client writes. (Addressed as a part of [#11][gh-pull-0011]). ## Version 0.2.0 -### Features +### Added -- Introduce an asynchronous (futures aware) cache. +- Add an asynchronous, futures aware cache (`moka::future::Cache`) and its builder. + ([#7][gh-pull-0007]) ## Version 0.1.0 -### Features +### Added + +- Add thread-safe, highly concurrent in-memory cache implementations + (`moka::sync::{Cache, SegmentedCache}`) with the following features: + - Bounded by the maximum number of elements. + - Maintains good hit rate by using entry replacement algorithms inspired by + [Caffeine][caffeine-git]: + - Admission to a cache is controlled by the Least Frequently Used (LFU) policy. + - Eviction from a cache is controlled by the Least Recently Used (LRU) policy. + - Expiration policies: + - Time to live + - Time to idle -- Thread-safe, highly concurrent in-memory cache implementations. -- Caches are bounded by the maximum number of elements. -- Maintains good hit rate by using entry replacement algorithms inspired by - [Caffeine][caffeine-git]: - - Admission to a cache is controlled by the Least Frequently Used (LFU) policy. - - Eviction from a cache is controlled by the Least Recently Used (LRU) policy. -- Supports expiration policies: - - Time to live - - Time to idle + + [caffeine-git]: https://github.com/ben-manes/caffeine + +[gh-pull-0011]: https://github.com/moka-rs/moka/pull/11/ +[gh-pull-0009]: https://github.com/moka-rs/moka/pull/9/ +[gh-pull-0007]: https://github.com/moka-rs/moka/pull/7/ diff --git a/Cargo.toml b/Cargo.toml index a31bb8ed..d828e08c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "moka" -version = "0.2.0" +version = "0.3.0" authors = ["Tatsuya Kawano "] edition = "2018" @@ -27,7 +27,7 @@ future = ["async-io"] cht = "0.4" crossbeam-channel = "0.5" num_cpus = "1.13" -once_cell = "1.5" +once_cell = "1.7" parking_lot = "0.11" # v0.7.1 or newer should be used as v0.7.0 will not compile on non-x86_64 platforms. # https://github.com/metrics-rs/quanta/pull/38 diff --git a/README.md b/README.md index 62eca755..d65b4ed9 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,12 @@ Moka is a fast, concurrent cache library for Rust. Moka is inspired by [Caffeine][caffeine-git] (Java) and [Ristretto][ristretto-git] (Go). Moka provides cache implementations that support full concurrency of retrievals and -a high expected concurrency for updates. They perform a best-effort bounding of a -concurrent hash map using an entry replacement algorithm to determine which entries -to evict when the capacity is exceeded. +a high expected concurrency for updates. Moka also provides a not thread-safe cache +implementation for single thread applications. + +All caches perform a best-effort bounding of a hash map using an entry +replacement algorithm to determine which entries to evict when the capacity is +exceeded. [gh-actions-badge]: https://github.com/moka-rs/moka/workflows/CI/badge.svg [release-badge]: https://img.shields.io/crates/v/moka.svg @@ -35,9 +38,10 @@ to evict when the capacity is exceeded. ## Features - Thread-safe, highly concurrent in-memory cache implementations: - - Synchronous (blocking) caches that can be shared across OS threads. + - Blocking caches that can be shared across OS threads. - An asynchronous (futures aware) cache that can be accessed inside and outside of asynchronous contexts. +- A not thread-safe, in-memory cache implementation for single thread applications. - Caches are bounded by the maximum number of entries. - Maintains good hit rate by using an entry replacement algorithms inspired by [Caffeine][caffeine-git]: @@ -54,25 +58,25 @@ Add this to your `Cargo.toml`: ```toml [dependencies] -moka = "0.2" +moka = "0.3" ``` To use the asynchronous cache, enable a crate feature called "future". ```toml [dependencies] -moka = { version = "0.2", features = ["future"] } +moka = { version = "0.3", features = ["future"] } ``` ## Example: Synchronous Cache -The synchronous (blocking) caches are defined in the `sync` module. +The thread-safe, blocking caches are defined in the `sync` module. Cache entries are manually added using `insert` method, and are stored in the cache until either evicted or manually invalidated. -Here's an example that reads and updates a cache by using multiple threads: +Here's an example of reading and updating a cache by using multiple threads: ```rust // Use the synchronous cache. @@ -157,7 +161,7 @@ Here is a similar program to the previous example, but using asynchronous cache // Cargo.toml // // [dependencies] -// moka = { version = "0.2", features = ["future"] } +// moka = { version = "0.3", features = ["future"] } // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } // futures = "0.3" @@ -220,12 +224,13 @@ async fn main() { ## Avoiding to clone the value at `get` -The return type of `get` method is `Option` instead of `Option<&V>`, where `V` is -the value type. Every time `get` is called for an existing key, it creates a clone of -the stored value `V` and returns it. This is because the `Cache` allows concurrent -updates from threads so a value stored in the cache can be dropped or replaced at any -time by any other thread. `get` cannot return a reference `&V` as it is impossible to -guarantee the value outlives the reference. +For the concurrent caches (`sync` and `future` caches), the return type of `get` +method is `Option` instead of `Option<&V>`, where `V` is the value type. Every +time `get` is called for an existing key, it creates a clone of the stored value `V` +and returns it. This is because the `Cache` allows concurrent updates from threads so +a value stored in the cache can be dropped or replaced at any time by any other +thread. `get` cannot return a reference `&V` as it is impossible to guarantee the +value outlives the reference. If you want to store values that will be expensive to clone, wrap them by `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a thread-safe diff --git a/src/common.rs b/src/common.rs index e7956937..5ea3ede1 100644 --- a/src/common.rs +++ b/src/common.rs @@ -11,3 +11,11 @@ pub(crate) trait AccessTime { fn last_modified(&self) -> Option; fn set_last_modified(&mut self, timestamp: Instant); } + +pub(crate) fn u64_to_instant(ts: u64) -> Option { + if ts == u64::MAX { + None + } else { + Some(unsafe { std::mem::transmute(ts) }) + } +} diff --git a/src/common/deque.rs b/src/common/deque.rs index b7cb8947..fa2984f5 100644 --- a/src/common/deque.rs +++ b/src/common/deque.rs @@ -91,6 +91,10 @@ impl Deque { } } + pub(crate) fn region(&self) -> &CacheRegion { + &self.region + } + pub(crate) fn contains(&self, node: &DeqNode) -> bool { self.region == node.region && (node.prev.is_some() || self.is_head(node)) } diff --git a/src/future.rs b/src/future.rs index 56db8e46..946c90f9 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,4 +1,4 @@ -//! Provides thread-safe, asynchronous (futures aware) cache implementations. +//! Provides a thread-safe, asynchronous (futures aware) cache implementation. //! //! To use this module, enable a crate feature called "future". diff --git a/src/future/builder.rs b/src/future/builder.rs index fff299fe..640936ba 100644 --- a/src/future/builder.rs +++ b/src/future/builder.rs @@ -50,8 +50,8 @@ where K: Eq + Hash, V: Clone, { - /// Construct a new `CacheBuilder` that will be used to build a `Cache` or - /// `SegmentedCache` holding up to `max_capacity` entries. + /// Construct a new `CacheBuilder` that will be used to build a `Cache` holding + /// up to `max_capacity` entries. pub fn new(max_capacity: usize) -> Self { Self { max_capacity, @@ -64,9 +64,6 @@ where } /// Builds a `Cache`. - /// - /// If you want to build a `SegmentedCache`, call `segments` method before - /// calling this method. pub fn build(self) -> Cache { let build_hasher = RandomState::default(); Cache::with_everything( @@ -79,9 +76,6 @@ where } /// Builds a `Cache`, with the given `hasher`. - /// - /// If you want to build a `SegmentedCache`, call `segments` method before - /// calling this method. pub fn build_with_hasher(self, hasher: S) -> Cache where S: BuildHasher + Clone, diff --git a/src/future/cache.rs b/src/future/cache.rs index 50005fa0..f5136bc4 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -40,7 +40,7 @@ use std::{ /// [`blocking_invalidate`](#method.blocking_invalidate) methods. They will block /// for a short time under heavy updates. /// -/// Here's an example that reads and updates a cache by using multiple asynchronous +/// Here's an example of reading and updating a cache by using multiple asynchronous /// tasks with [Tokio][tokio-crate] runtime: /// /// [tokio-crate]: https://crates.io/crates/tokio @@ -49,7 +49,7 @@ use std::{ /// // Cargo.toml /// // /// // [dependencies] -/// // moka = { version = "0.2", features = ["future"] } +/// // moka = { version = "0.3", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// // futures = "0.3" /// @@ -327,6 +327,20 @@ where } } + /// Discards all cached values. + /// + /// This method returns immediately and a background thread will evict all the + /// cached values inserted before the time when this method was called. It is + /// guaranteed that the `get` method must not return these invalidated values + /// even if they have not been evicted. + /// + /// Like the `invalidate` method, this method does not clear the historic + /// popularity estimator of keys so that it retains the client activities of + /// trying to retrieve an item. + pub fn invalidate_all(&self) { + self.base.invalidate_all(); + } + /// Returns the `max_capacity` of this cache. pub fn max_capacity(&self) -> usize { self.base.max_capacity() @@ -577,6 +591,34 @@ mod tests { assert!(cache.get(&20).is_some()); } + #[tokio::test] + async fn invalidate_all() { + let mut cache = Cache::new(100); + cache.reconfigure_for_testing(); + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", "alice").await; + cache.insert("b", "bob").await; + cache.insert("c", "cindy").await; + assert_eq!(cache.get(&"a"), Some("alice")); + assert_eq!(cache.get(&"b"), Some("bob")); + assert_eq!(cache.get(&"c"), Some("cindy")); + cache.sync(); + + cache.invalidate_all(); + cache.sync(); + + cache.insert("d", "david").await; + cache.sync(); + + assert!(cache.get(&"a").is_none()); + assert!(cache.get(&"b").is_none()); + assert!(cache.get(&"c").is_none()); + assert_eq!(cache.get(&"d"), Some("david")); + } + #[tokio::test] async fn time_to_live() { let mut cache = CacheBuilder::new(100) diff --git a/src/lib.rs b/src/lib.rs index becb36f6..fe9b989e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,12 +5,16 @@ //! [Caffeine][caffeine-git] (Java) and [Ristretto][ristretto-git] (Go). //! //! Moka provides in-memory concurrent cache implementations that support full -//! concurrency of retrievals and a high expected concurrency for updates. -//! +//! concurrency of retrievals and a high expected concurrency for updates. //! They utilize a lock-free concurrent hash table `cht::SegmentedHashMap` from the -//! [cht][cht-crate] crate for the central key-value storage. These caches perform a -//! best-effort bounding of the map using an entry replacement algorithm to determine -//! which entries to evict when the capacity is exceeded. +//! [cht][cht-crate] crate for the central key-value storage. +//! +//! Moka also provides an in-memory, not thread-safe cache implementation for single +//! thread applications. +//! +//! All cache implementations perform a best-effort bounding of the map using an entry +//! replacement algorithm to determine which entries to evict when the capacity is +//! exceeded. //! //! [caffeine-git]: https://github.com/ben-manes/caffeine //! [ristretto-git]: https://github.com/dgraph-io/ristretto @@ -19,9 +23,10 @@ //! # Features //! //! - Thread-safe, highly concurrent in-memory cache implementations: -//! - Synchronous (blocking) caches that can be shared across OS threads. +//! - Blocking caches that can be shared across OS threads. //! - An asynchronous (futures aware) cache that can be accessed inside and //! outside of asynchronous contexts. +//! - A not thread-safe, in-memory cache implementation for single thread applications. //! - Caches are bounded by the maximum number of entries. //! - Maintains good hit rate by using entry replacement algorithms inspired by //! [Caffeine][caffeine-git]: @@ -33,35 +38,38 @@ //! //! # Examples //! -//! See the followings: +//! See the following document: //! -//! - Synchronous (blocking) caches: -//! - The document for the [`sync::Cache`][sync-cache-struct] and +//! - Thread-safe, blocking caches: +//! - [`sync::Cache`][sync-cache-struct] and //! [`sync::SegmentedCache`][sync-seg-cache-struct]. -//! - Asynchronous (futures aware) cache: -//! - The document for the [`future::Cache`][future-cache-struct]. +//! - An asynchronous (futures aware) cache: +//! - [`future::Cache`][future-cache-struct]. +//! - A not thread-safe, blocking cache for single threaded applications: +//! - [`unsync::Cache`][unsync-cache-struct]. //! //! [future-cache-struct]: ./future/struct.Cache.html //! [sync-cache-struct]: ./sync/struct.Cache.html //! [sync-seg-cache-struct]: ./sync/struct.SegmentedCache.html +//! [unsync-cache-struct]: ./unsync/struct.Cache.html //! //! # Minimum Supported Rust Version //! //! This crate's minimum supported Rust version (MSRV) is 1.45.2. //! -//! If no feature is enabled, MSRV will be updated conservatively. When using other -//! features, like `async` (which is not available yet), MSRV might be updated more -//! frequently, up to the latest stable. In both cases, increasing MSRV is _not_ -//! considered a semver-breaking change. +//! If no crate feature is enabled, MSRV will be updated conservatively. When using +//! features like `future`, MSRV might be updated more frequently, up to the latest +//! stable. In both cases, increasing MSRV is _not_ considered a semver-breaking +//! change. //! //! # Implementation Details //! //! ## Concurrency //! -//! The entry replacement algorithms are kept eventually consistent with the -//! map. While updates to the cache are immediately applied to the map, recording of -//! reads and writes may not be immediately reflected on the cache policy's data -//! structures. +//! In a concurrent cache (`sync` or `future` cache), the entry replacement +//! algorithms are kept eventually consistent with the map. While updates to the +//! cache are immediately applied to the map, recording of reads and writes may not +//! be immediately reflected on the cache policy's data structures. //! //! These structures are guarded by a lock and operations are applied in batches to //! avoid lock contention. There are bounded inter-thread channels to hold these @@ -95,12 +103,11 @@ //! retained in a historic popularity estimator. This estimator has a tiny memory //! footprint as it uses hashing to probabilistically estimate an item's frequency. //! -//! Both `Cache` and `SegmentedCache` employ [TinyLFU] (Least Frequently Used) as the -//! admission policy. When a new entry is inserted to the cache, it is temporary -//! admitted to the cache, and a recording of this insertion is added to the write -//! queue. When the write queue is drained and the main space of the cache is already -//! full, then the historic popularity estimator determines to evict one of the -//! following entries: +//! All caches employ [TinyLFU] (Least Frequently Used) as the admission policy. When +//! a new entry is inserted to the cache, it is temporary admitted to the cache, and +//! a recording of this insertion is added to the write queue. When the write queue +//! is drained and the main space of the cache is already full, then the historic +//! popularity estimator determines to evict one of the following entries: //! //! - The temporary admitted entry. //! - Or, an entry that is selected from the main cache space by LRU (Least Recently @@ -124,7 +131,8 @@ //! //! A future release will support the following: //! -//! - The variable expiration +//! - The variable expiration (which allows to set different expiration on each +//! cached entry) //! //! These policies are provided with _O(1)_ time complexity: //! diff --git a/src/sync.rs b/src/sync.rs index 1326be73..31a65fe1 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,13 +1,13 @@ -//! Provides thread-safe, synchronous (blocking) cache implementations. +//! Provides thread-safe, blocking cache implementations. -use crate::common::{deque::DeqNode, AccessTime}; +use crate::common::{deque::DeqNode, u64_to_instant, AccessTime}; use parking_lot::Mutex; use quanta::Instant; use std::{ ptr::NonNull, sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, }; @@ -40,13 +40,22 @@ impl KeyHash { } } +impl Clone for KeyHash { + fn clone(&self) -> Self { + Self { + key: Arc::clone(&self.key), + hash: self.hash, + } + } +} + pub(crate) struct KeyDate { pub(crate) key: Arc, - pub(crate) timestamp: Option>, + pub(crate) timestamp: Arc, } impl KeyDate { - pub(crate) fn new(key: Arc, timestamp: Option>) -> Self { + pub(crate) fn new(key: Arc, timestamp: Arc) -> Self { Self { key, timestamp } } } @@ -54,11 +63,11 @@ impl KeyDate { pub(crate) struct KeyHashDate { pub(crate) key: Arc, pub(crate) hash: u64, - pub(crate) timestamp: Option>, + pub(crate) timestamp: Arc, } impl KeyHashDate { - pub(crate) fn new(kh: KeyHash, timestamp: Option>) -> Self { + pub(crate) fn new(kh: KeyHash, timestamp: Arc) -> Self { Self { key: kh.key, hash: kh.hash, @@ -86,21 +95,19 @@ unsafe impl Send for DeqNodes {} pub(crate) struct ValueEntry { pub(crate) value: V, - last_accessed: Option>, - last_modified: Option>, + is_admitted: Arc, + last_accessed: Arc, + last_modified: Arc, nodes: Mutex>, } impl ValueEntry { - pub(crate) fn new( - value: V, - last_accessed: Option, - last_modified: Option, - ) -> Self { + pub(crate) fn new(value: V) -> Self { Self { value, - last_accessed: last_accessed.map(|ts| Arc::new(AtomicU64::new(ts.as_u64()))), - last_modified: last_modified.map(|ts| Arc::new(AtomicU64::new(ts.as_u64()))), + is_admitted: Arc::new(AtomicBool::new(false)), + last_accessed: Arc::new(AtomicU64::new(std::u64::MAX)), + last_modified: Arc::new(AtomicU64::new(std::u64::MAX)), nodes: Mutex::new(DeqNodes { access_order_q_node: None, write_order_q_node: None, @@ -116,20 +123,36 @@ impl ValueEntry { write_order_q_node: other_nodes.write_order_q_node, } }; + let last_accessed = Arc::clone(&other.last_accessed); + let last_modified = Arc::clone(&other.last_modified); + // To prevent this updated ValueEntry from being evicted by a expiration policy, + // set the max value to the timestamps. They will be replaced with the real + // timestamps when applying writes. + last_accessed.store(std::u64::MAX, Ordering::Release); + last_modified.store(std::u64::MAX, Ordering::Release); Self { value, - last_accessed: other.last_accessed.clone(), - last_modified: other.last_modified.clone(), + is_admitted: Arc::clone(&other.is_admitted), + last_accessed, + last_modified, nodes: Mutex::new(nodes), } } - pub(crate) fn raw_last_accessed(&self) -> Option> { - self.last_accessed.clone() + pub(crate) fn is_admitted(&self) -> bool { + self.is_admitted.load(Ordering::Acquire) + } + + pub(crate) fn set_is_admitted(&self, value: bool) { + self.is_admitted.store(value, Ordering::Release); } - pub(crate) fn raw_last_modified(&self) -> Option> { - self.last_modified.clone() + pub(crate) fn raw_last_accessed(&self) -> Arc { + Arc::clone(&self.last_accessed) + } + + pub(crate) fn raw_last_modified(&self) -> Arc { + Arc::clone(&self.last_modified) } pub(crate) fn access_order_q_node(&self) -> Option> { @@ -166,44 +189,24 @@ impl ValueEntry { impl AccessTime for Arc> { #[inline] fn last_accessed(&self) -> Option { - self.last_accessed - .as_ref() - .map(|ts| ts.load(Ordering::Relaxed)) - .and_then(|ts| { - if ts == u64::MAX { - None - } else { - Some(unsafe { std::mem::transmute(ts) }) - } - }) + u64_to_instant(self.last_accessed.load(Ordering::Acquire)) } #[inline] fn set_last_accessed(&mut self, timestamp: Instant) { - if let Some(ts) = &self.last_accessed { - ts.store(timestamp.as_u64(), Ordering::Relaxed); - } + self.last_accessed + .store(timestamp.as_u64(), Ordering::Release); } #[inline] fn last_modified(&self) -> Option { - self.last_modified - .as_ref() - .map(|ts| ts.load(Ordering::Relaxed)) - .and_then(|ts| { - if ts == u64::MAX { - None - } else { - Some(unsafe { std::mem::transmute(ts) }) - } - }) + u64_to_instant(self.last_modified.load(Ordering::Acquire)) } #[inline] fn set_last_modified(&mut self, timestamp: Instant) { - if let Some(ts) = &self.last_modified { - ts.store(timestamp.as_u64(), Ordering::Relaxed); - } + self.last_modified + .store(timestamp.as_u64(), Ordering::Release); } } @@ -220,48 +223,28 @@ impl AccessTime for DeqNode> { #[inline] fn last_modified(&self) -> Option { - self.element - .timestamp - .as_ref() - .map(|ts| ts.load(Ordering::Relaxed)) - .and_then(|ts| { - if ts == u64::MAX { - None - } else { - Some(unsafe { std::mem::transmute(ts) }) - } - }) + u64_to_instant(self.element.timestamp.load(Ordering::Acquire)) } #[inline] fn set_last_modified(&mut self, timestamp: Instant) { - if let Some(ts) = self.element.timestamp.as_ref() { - ts.store(timestamp.as_u64(), Ordering::Relaxed); - } + self.element + .timestamp + .store(timestamp.as_u64(), Ordering::Release); } } impl AccessTime for DeqNode> { #[inline] fn last_accessed(&self) -> Option { - self.element - .timestamp - .as_ref() - .map(|ts| ts.load(Ordering::Relaxed)) - .and_then(|ts| { - if ts == u64::MAX { - None - } else { - Some(unsafe { std::mem::transmute(ts) }) - } - }) + u64_to_instant(self.element.timestamp.load(Ordering::Acquire)) } #[inline] fn set_last_accessed(&mut self, timestamp: Instant) { - if let Some(ts) = self.element.timestamp.as_ref() { - ts.store(timestamp.as_u64(), Ordering::Relaxed); - } + self.element + .timestamp + .store(timestamp.as_u64(), Ordering::Release); } #[inline] @@ -276,12 +259,11 @@ impl AccessTime for DeqNode> { } pub(crate) enum ReadOp { - Hit(u64, Arc>, Option), + Hit(u64, Arc>, Instant), Miss(u64), } pub(crate) enum WriteOp { - Insert(KeyHash, Arc>), - Update(Arc>), + Upsert(KeyHash, Arc>), Remove(Arc>), } diff --git a/src/sync/base_cache.rs b/src/sync/base_cache.rs index 4ef7a704..53a72ba2 100644 --- a/src/sync/base_cache.rs +++ b/src/sync/base_cache.rs @@ -10,7 +10,7 @@ use crate::common::{ }; use crossbeam_channel::{Receiver, Sender, TrySendError}; -use parking_lot::{Mutex, MutexGuard, RwLock}; +use parking_lot::{Mutex, RwLock}; use quanta::{Clock, Instant}; use std::{ borrow::Borrow, @@ -19,7 +19,7 @@ use std::{ ptr::NonNull, rc::Rc, sync::{ - atomic::{AtomicBool, AtomicU8, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}, Arc, }, time::Duration, @@ -120,36 +120,30 @@ where Arc: Borrow, Q: Hash + Eq + ?Sized, { - let record = |entry, ts| { - self.record_read_op(hash, entry, ts) - .expect("Failed to record a get op"); + let record = |op| { + self.record_read_op(op).expect("Failed to record a get op"); }; - match (self.inner.get(key), self.inner.has_expiry()) { - // Value not found. - (None, _) => { - record(None, None); + match self.inner.get(key) { + None => { + record(ReadOp::Miss(hash)); None } - // Value found, no expiry. - (Some(entry), false) => { - let v = entry.value.clone(); - record(Some(entry), None); - Some(v) - } - // Value found, need to check if expired. - (Some(entry), true) => { - let now = self.inner.current_time_from_expiration_clock(); - if self.inner.is_expired_entry_wo(&entry, now) - || self.inner.is_expired_entry_ao(&entry, now) + Some(entry) => { + let i = &self.inner; + let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), i.valid_after()); + let now = i.current_time_from_expiration_clock(); + + if is_expired_entry_wo(ttl, va, &entry, now) + || is_expired_entry_ao(tti, va, &entry, now) { // Expired entry. Record this access as a cache miss rather than a hit. - record(None, None); + record(ReadOp::Miss(hash)); None } else { // Valid entry. let v = entry.value.clone(); - record(Some(entry), Some(now)); + record(ReadOp::Hit(hash, entry, now)); Some(v) } } @@ -179,6 +173,11 @@ where } } + pub(crate) fn invalidate_all(&self) { + let now = self.inner.current_time_from_expiration_clock(); + self.inner.set_valid_after(now); + } + pub(crate) fn max_capacity(&self) -> usize { self.inner.max_capacity() } @@ -202,20 +201,9 @@ where S: BuildHasher + Clone, { #[inline] - fn record_read_op( - &self, - hash: u64, - entry: Option>>, - timestamp: Option, - ) -> Result<(), TrySendError>> { - use ReadOp::*; + fn record_read_op(&self, op: ReadOp) -> Result<(), TrySendError>> { self.apply_reads_if_needed(); let ch = &self.read_op_ch; - let op = if let Some(entry) = entry { - Hit(hash, entry, timestamp) - } else { - Miss(hash) - }; match ch.try_send(op) { // Discard the ReadOp when the channel is full. Ok(()) | Err(TrySendError::Full(_)) => Ok(()), @@ -244,22 +232,11 @@ where Arc::clone(&key), // on_insert || { - let mut last_accessed = None; - let mut last_modified = None; - if self.inner.has_expiry() { - let ts = unsafe { std::mem::transmute(std::u64::MAX) }; - if self.inner.time_to_idle().is_some() { - last_accessed = Some(ts); - } - if self.inner.time_to_live().is_some() { - last_modified = Some(ts); - } - } - let entry = Arc::new(ValueEntry::new(value.clone(), last_accessed, last_modified)); + let entry = Arc::new(ValueEntry::new(value.clone())); let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed); op1 = Some(( cnt, - WriteOp::Insert(KeyHash::new(key, hash), Arc::clone(&entry)), + WriteOp::Upsert(KeyHash::new(Arc::clone(&key), hash), Arc::clone(&entry)), )); entry }, @@ -270,7 +247,7 @@ where op2 = Some(( cnt, Arc::clone(&old_entry), - WriteOp::Update(Arc::clone(&entry)), + WriteOp::Upsert(KeyHash::new(Arc::clone(&key), hash), Arc::clone(&entry)), )); entry }, @@ -358,6 +335,7 @@ pub(crate) struct Inner { write_op_ch: Receiver>, time_to_live: Option, time_to_idle: Option, + valid_after: AtomicU64, has_expiration_clock: AtomicBool, expiration_clock: RwLock>, } @@ -398,6 +376,7 @@ where write_op_ch, time_to_live, time_to_idle, + valid_after: AtomicU64::new(0), has_expiration_clock: AtomicBool::new(false), expiration_clock: RwLock::new(None), } @@ -451,6 +430,23 @@ where self.time_to_live.is_some() || self.time_to_idle.is_some() } + #[inline] + fn valid_after(&self) -> Instant { + let ts = self.valid_after.load(Ordering::Acquire); + unsafe { std::mem::transmute(ts) } + } + + #[inline] + fn set_valid_after(&self, timestamp: Instant) { + self.valid_after + .store(timestamp.as_u64(), Ordering::Release); + } + + #[inline] + fn has_valid_after(&self) -> bool { + self.valid_after.load(Ordering::Acquire) > 0 + } + #[inline] fn current_time_from_expiration_clock(&self) -> Instant { if self.has_expiration_clock.load(Ordering::Relaxed) { @@ -463,28 +459,6 @@ where Instant::now() } } - - #[inline] - fn is_expired_entry_ao(&self, entry: &impl AccessTime, now: Instant) -> bool { - debug_assert!(self.has_expiry()); - if let (Some(ts), Some(tti)) = (entry.last_accessed(), self.time_to_idle) { - if ts + tti <= now { - return true; - } - } - false - } - - #[inline] - fn is_expired_entry_wo(&self, entry: &impl AccessTime, now: Instant) -> bool { - debug_assert!(self.has_expiry()); - if let (Some(ts), Some(ttl)) = (entry.last_modified(), self.time_to_live) { - if ts + ttl <= now { - return true; - } - } - false - } } impl InnerSync for Inner @@ -493,27 +467,11 @@ where S: BuildHasher + Clone, { fn sync(&self, max_repeats: usize) -> Option { - if self.read_op_ch.is_empty() && self.write_op_ch.is_empty() && !self.has_expiry() { - return None; - } - - let deqs = self.deques.lock(); - self.do_sync(deqs, max_repeats) - } -} + const EVICTION_BATCH_SIZE: usize = 500; -// -// private methods -// -impl Inner -where - K: Hash + Eq, - S: BuildHasher + Clone, -{ - fn do_sync(&self, mut deqs: MutexGuard<'_, Deques>, max_repeats: usize) -> Option { + let mut deqs = self.deques.lock(); let mut calls = 0; let mut should_sync = true; - const EVICTION_BATCH_SIZE: usize = 500; while should_sync && calls <= max_repeats { let r_len = self.read_op_ch.len(); @@ -526,7 +484,7 @@ where self.apply_writes(&mut deqs, w_len); } - if self.has_expiry() { + if self.has_expiry() || self.has_valid_after() { self.evict(&mut deqs, EVICTION_BATCH_SIZE); } @@ -544,7 +502,16 @@ where None } } +} +// +// private methods +// +impl Inner +where + K: Hash + Eq, + S: BuildHasher + Clone, +{ fn apply_reads(&self, deqs: &mut Deques, count: usize) { use ReadOp::*; let mut freq = self.frequency_sketch.write(); @@ -553,10 +520,8 @@ where match ch.try_recv() { Ok(Hit(hash, mut entry, timestamp)) => { freq.increment(hash); - if let Some(ts) = timestamp { - entry.set_last_accessed(ts); - } - deqs.move_to_back_ao(entry) + entry.set_last_accessed(timestamp); + deqs.move_to_back_ao(&entry) } Ok(Miss(hash)) => freq.increment(hash), Err(_) => break, @@ -568,101 +533,113 @@ where use WriteOp::*; let freq = self.frequency_sketch.read(); let ch = &self.write_op_ch; - - let timestamp = if self.has_expiry() { - Some(self.current_time_from_expiration_clock()) - } else { - None - }; + let ts = self.current_time_from_expiration_clock(); for _ in 0..count { match ch.try_recv() { - Ok(Insert(kh, entry)) => self.handle_insert(kh, entry, timestamp, deqs, &freq), - Ok(Update(mut entry)) => { - if let Some(ts) = timestamp { - entry.set_last_accessed(ts); - entry.set_last_modified(ts); - } - deqs.move_to_back_ao(Arc::clone(&entry)); - deqs.move_to_back_wo(entry) - } - Ok(Remove(entry)) => { - Self::handle_remove(deqs, entry); - } + Ok(Upsert(kh, entry)) => self.handle_upsert(kh, entry, ts, deqs, &freq), + Ok(Remove(entry)) => Self::handle_remove(deqs, entry), Err(_) => break, }; } } - fn handle_insert( + fn handle_upsert( &self, kh: KeyHash, - entry: Arc>, - timestamp: Option, + mut entry: Arc>, + timestamp: Instant, deqs: &mut Deques, freq: &FrequencySketch, ) { - let last_accessed = entry.raw_last_accessed().map(|ts| { - ts.store(timestamp.unwrap().as_u64(), Ordering::Relaxed); - ts - }); - let last_modified = entry.raw_last_modified().map(|ts| { - ts.store(timestamp.unwrap().as_u64(), Ordering::Relaxed); - ts - }); - - if self.cache.len() <= self.max_capacity { - // Add the candidate to the deque. - let key = Arc::clone(&kh.key); - deqs.push_back_ao( - CacheRegion::MainProbation, - KeyHashDate::new(kh, last_accessed), - &entry, - ); - if self.time_to_live.is_some() { - deqs.push_back_wo(KeyDate::new(key, last_modified), &entry); - } - } else { - let victim = Self::find_cache_victim(deqs, freq); - if Self::admit(kh.hash, victim, freq) { - // Remove the victim from the cache and deque. - // - // TODO: Check if the selected victim was actually removed. If not, - // maybe we should find another victim. This can happen because it - // could have been already removed from the cache but the removal - // from the deque is still on the write operations queue and is not - // yet executed. - if let Some(vic_entry) = self.cache.remove(&victim.element.key) { - Self::handle_remove(deqs, vic_entry); + const MAX_RETRY: usize = 5; + let mut tries = 0; + let mut done = false; + + entry.set_last_accessed(timestamp); + entry.set_last_modified(timestamp); + let last_accessed = entry.raw_last_accessed(); + let last_modified = entry.raw_last_modified(); + + while tries < MAX_RETRY { + tries += 1; + + if entry.is_admitted() { + // The entry has been already admitted, so treat this as an update. + deqs.move_to_back_ao(&entry); + deqs.move_to_back_wo(&entry); + done = true; + break; + } else if self.cache.len() <= self.max_capacity { + // There are some room in the cache. Add the candidate to the deques. + self.handle_admit(kh.clone(), &entry, last_accessed, last_modified, deqs); + done = true; + break; + } else { + let victim = match Self::find_cache_victim(deqs, freq) { + // Found a victim. + Some(node) => node, + // Not found a victim. This condition should be unreachable + // because there was no room in the cache. But rather than + // panicking here, admit the candidate as there might be some + // room in te cache now. + None => { + self.handle_admit(kh.clone(), &entry, last_accessed, last_modified, deqs); + done = true; + break; + } + }; + + if Self::admit(kh.hash, victim, freq) { + // The candidate is admitted. Try to remove the victim from the + // cache (hash map). + if let Some(vic_entry) = self.cache.remove(&victim.element.key) { + // And then remove the victim from the deques. + Self::handle_remove(deqs, vic_entry); + } else { + // Could not remove the victim from the cache. Skip this + // victim node as its ValueEntry might have been + // invalidated. Since the invalidated ValueEntry (which + // should be still in the write op queue) has a pointer to + // this node, we move the node to the back of the deque + // instead of unlinking (dropping) it. + let victim = NonNull::from(victim); + unsafe { deqs.probation.move_to_back(victim) }; + continue; // Retry + } + // Add the candidate to the deques. + self.handle_admit( + kh.clone(), + &entry, + Arc::clone(&last_accessed), + Arc::clone(&last_modified), + deqs, + ); + done = true; + break; } else { - let victim = NonNull::from(victim); - deqs.unlink_node_ao(victim); - } - // Add the candidate to the deque. - let key = Arc::clone(&kh.key); - deqs.push_back_ao( - CacheRegion::MainProbation, - KeyHashDate::new(kh, last_accessed), - &entry, - ); - if self.time_to_live.is_some() { - deqs.push_back_wo(KeyDate::new(key, last_modified), &entry); + // The candidate is not admitted. Remove it from the cache (hash map). + self.cache.remove(&Arc::clone(&kh.key)); + done = true; + break; } - } else { - // Remove the candidate from the cache. - self.cache.remove(&kh.key); } } + + if !done { + // Too mary retries. Remove the candidate from the cache. + self.cache.remove(&Arc::clone(&kh.key)); + } } #[inline] fn find_cache_victim<'a>( - deqs: &'a mut Deques, + deqs: &'a Deques, _freq: &FrequencySketch, - ) -> &'a DeqNode> { + ) -> Option<&'a DeqNode>> { // TODO: Check its frequency. If it is not very low, maybe we should // check frequencies of next few others and pick from them. - deqs.probation.peek_front().expect("No victim found") + deqs.probation.peek_front() } #[inline] @@ -676,21 +653,57 @@ where freq.frequency(candidate_hash) > freq.frequency(victim.element.hash) } + fn handle_admit( + &self, + kh: KeyHash, + entry: &Arc>, + raw_last_accessed: Arc, + raw_last_modified: Arc, + deqs: &mut Deques, + ) { + let key = Arc::clone(&kh.key); + deqs.push_back_ao( + CacheRegion::MainProbation, + KeyHashDate::new(kh, raw_last_accessed), + entry, + ); + if self.time_to_live.is_some() { + deqs.push_back_wo(KeyDate::new(key, raw_last_modified), &entry); + } + entry.set_is_admitted(true); + } + fn handle_remove(deqs: &mut Deques, entry: Arc>) { - deqs.unlink_ao(Arc::clone(&entry)); - Deques::unlink_wo(&mut deqs.write_order, entry); + if entry.is_admitted() { + entry.set_is_admitted(false); + deqs.unlink_ao(&entry); + Deques::unlink_wo(&mut deqs.write_order, &entry); + } + entry.unset_q_nodes(); } - fn evict(&self, deqs: &mut Deques, batch_size: usize) { - debug_assert!(self.has_expiry()); + fn handle_remove_with_deques( + ao_deq_name: &str, + ao_deq: &mut Deque>, + wo_deq: &mut Deque>, + entry: Arc>, + ) { + if entry.is_admitted() { + entry.set_is_admitted(false); + Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry); + Deques::unlink_wo(wo_deq, &entry); + } + entry.unset_q_nodes(); + } + fn evict(&self, deqs: &mut Deques, batch_size: usize) { let now = self.current_time_from_expiration_clock(); if self.time_to_live.is_some() { self.remove_expired_wo(deqs, batch_size, now); } - if self.time_to_idle.is_some() { + if self.time_to_idle.is_some() || self.has_valid_after() { let (window, probation, protected, wo) = ( &mut deqs.window, &mut deqs.probation, @@ -716,12 +729,18 @@ where batch_size: usize, now: Instant, ) { + let tti = &self.time_to_idle; + let va = self.valid_after(); for _ in 0..batch_size { - let key = deq + // Peek the front node of the deque and check if it is expired. + let (key, _ts) = deq .peek_front() .and_then(|node| { - if self.is_expired_entry_ao(&*node, now) { - Some(Some(Arc::clone(&node.element.key))) + if is_expired_entry_ao(tti, va, &*node, now) { + Some(( + Some(Arc::clone(&node.element.key)), + Some(&node.element.timestamp), + )) } else { None } @@ -732,24 +751,57 @@ where break; } - if let Some(entry) = self.cache.remove(&key.unwrap()) { - Deques::unlink_ao_from_deque(deq_name, deq, Arc::clone(&entry)); - Deques::unlink_wo(write_order_deq, entry); + let key = key.as_ref().unwrap(); + + // Remove the key from the map only when the entry is really + // expired. This check is needed because it is possible that the entry in + // the map has been updated or deleted but its deque node we checked + // above have not been updated yet. + let maybe_entry = self + .cache + .remove_if(key, |_, v| is_expired_entry_ao(tti, va, v, now)); + + if let Some(entry) = maybe_entry { + Self::handle_remove_with_deques(deq_name, deq, write_order_deq, entry); + } else if let Some(entry) = self.cache.get(key) { + let ts = entry.last_accessed(); + if ts.is_none() { + // The key exists and the entry has been updated. + Deques::move_to_back_ao_in_deque(deq_name, deq, &entry); + Deques::move_to_back_wo_in_deque(write_order_deq, &entry); + } else { + // The key exists but something unexpected. Break. + // print!("@[{}|{}] ", ts.unwrap().load(Ordering::Acquire), la.unwrap().as_u64()); + // print!("@"); + break; + } } else { - deq.pop_front(); + // Skip this entry as the key might have been invalidated. Since the + // invalidated ValueEntry (which should be still in the write op + // queue) has a pointer to this node, move the node to the back of + // the deque instead of popping (dropping) it. + if let Some(node) = deq.peek_front() { + let node = NonNull::from(node); + unsafe { deq.move_to_back(node) }; + } } } } #[inline] fn remove_expired_wo(&self, deqs: &mut Deques, batch_size: usize, now: Instant) { + let ttl = &self.time_to_live; + let va = self.valid_after(); for _ in 0..batch_size { - let key = deqs + let (key, _ts) = deqs .write_order .peek_front() .and_then(|node| { - if self.is_expired_entry_wo(&*node, now) { - Some(Some(Arc::clone(&node.element.key))) + if is_expired_entry_wo(ttl, va, &*node, now) { + Some(( + Some(Arc::clone(&node.element.key)), + Some(&node.element.timestamp), + )) } else { None } @@ -760,11 +812,34 @@ where break; } - if let Some(entry) = self.cache.remove(&key.unwrap()) { - deqs.unlink_ao(Arc::clone(&entry)); - Deques::unlink_wo(&mut deqs.write_order, entry); + let key = key.as_ref().unwrap(); + + let maybe_entry = self + .cache + .remove_if(key, |_, v| is_expired_entry_wo(ttl, va, v, now)); + + if let Some(entry) = maybe_entry { + Self::handle_remove(deqs, entry); + } else if let Some(entry) = self.cache.get(key) { + let ts = entry.last_modified(); + if ts.is_none() { + deqs.move_to_back_ao(&entry); + deqs.move_to_back_wo(&entry); + } else { + // The key exists but something unexpected. Break. + // print!("#[{}|{}] ", ts.unwrap().load(Ordering::Acquire), ts.unwrap().as_u64()); + // print!("#"); + break; + } } else { - deqs.write_order.pop_front(); + // Skip this entry as the key might have been invalidated. Since the + // invalidated ValueEntry (which should be still in the write op + // queue) has a pointer to this node, move the node to the back of + // the deque instead of popping (dropping) it. + if let Some(node) = deqs.write_order.peek_front() { + let node = NonNull::from(node); + unsafe { deqs.write_order.move_to_back(node) }; + } } } } @@ -794,3 +869,42 @@ where } } } + +// +// private free-standing functions +// +#[inline] +fn is_expired_entry_ao( + time_to_idle: &Option, + valid_after: Instant, + entry: &impl AccessTime, + now: Instant, +) -> bool { + if let Some(ts) = entry.last_accessed() { + if ts < valid_after { + return true; + } + if let Some(tti) = time_to_idle { + return ts + *tti <= now; + } + } + false +} + +#[inline] +fn is_expired_entry_wo( + time_to_live: &Option, + valid_after: Instant, + entry: &impl AccessTime, + now: Instant, +) -> bool { + if let Some(ts) = entry.last_modified() { + if ts < valid_after { + return true; + } + if let Some(ttl) = time_to_live { + return ts + *ttl <= now; + } + } + false +} diff --git a/src/sync/cache.rs b/src/sync/cache.rs index ea167078..f8f22b3e 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -30,7 +30,7 @@ use std::{ /// Cache entries are manually added using `insert` method, and are stored in the /// cache until either evicted or manually invalidated. /// -/// Here's an example that reads and updates a cache by using multiple threads: +/// Here's an example of reading and updating a cache by using multiple threads: /// /// ```rust /// use moka::sync::Cache; @@ -282,6 +282,20 @@ where } } + /// Discards all cached values. + /// + /// This method returns immediately and a background thread will evict all the + /// cached values inserted before the time when this method was called. It is + /// guaranteed that the `get` method must not return these invalidated values + /// even if they have not been evicted. + /// + /// Like the `invalidate` method, this method does not clear the historic + /// popularity estimator of keys so that it retains the client activities of + /// trying to retrieve an item. + pub fn invalidate_all(&self) { + self.base.invalidate_all(); + } + /// Returns the `max_capacity` of this cache. pub fn max_capacity(&self) -> usize { self.base.max_capacity() @@ -356,7 +370,7 @@ where K: Hash + Eq, S: BuildHasher + Clone, { - fn reconfigure_for_testing(&mut self) { + pub(crate) fn reconfigure_for_testing(&mut self) { self.base.reconfigure_for_testing(); } @@ -444,6 +458,34 @@ mod tests { assert!(cache.get(&20).is_some()); } + #[test] + fn invalidate_all() { + let mut cache = Cache::new(100); + cache.reconfigure_for_testing(); + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", "alice"); + cache.insert("b", "bob"); + cache.insert("c", "cindy"); + assert_eq!(cache.get(&"a"), Some("alice")); + assert_eq!(cache.get(&"b"), Some("bob")); + assert_eq!(cache.get(&"c"), Some("cindy")); + cache.sync(); + + cache.invalidate_all(); + cache.sync(); + + cache.insert("d", "david"); + cache.sync(); + + assert!(cache.get(&"a").is_none()); + assert!(cache.get(&"b").is_none()); + assert!(cache.get(&"c").is_none()); + assert_eq!(cache.get(&"d"), Some("david")); + } + #[test] fn time_to_live() { let mut cache = CacheBuilder::new(100) diff --git a/src/sync/deques.rs b/src/sync/deques.rs index 50106f7a..50cd5acd 100644 --- a/src/sync/deques.rs +++ b/src/sync/deques.rs @@ -31,11 +31,11 @@ impl Deques { pub(crate) fn push_back_ao( &mut self, region: CacheRegion, - kh: KeyHashDate, + khd: KeyHashDate, entry: &Arc>, ) { use CacheRegion::*; - let node = Box::new(DeqNode::new(region, kh)); + let node = Box::new(DeqNode::new(region, khd)); let node = match node.as_ref().region { Window => self.window.push_back(node), MainProbation => self.probation.push_back(node), @@ -45,13 +45,13 @@ impl Deques { entry.set_access_order_q_node(Some(node)); } - pub(crate) fn push_back_wo(&mut self, kh: KeyDate, entry: &Arc>) { - let node = Box::new(DeqNode::new(CacheRegion::WriteOrder, kh)); + pub(crate) fn push_back_wo(&mut self, kd: KeyDate, entry: &Arc>) { + let node = Box::new(DeqNode::new(CacheRegion::WriteOrder, kd)); let node = self.write_order.push_back(node); entry.set_write_order_q_node(Some(node)); } - pub(crate) fn move_to_back_ao(&mut self, entry: Arc>) { + pub(crate) fn move_to_back_ao(&mut self, entry: &Arc>) { use CacheRegion::*; if let Some(node) = entry.access_order_q_node() { let p = unsafe { node.as_ref() }; @@ -68,7 +68,27 @@ impl Deques { } } - pub(crate) fn move_to_back_wo(&mut self, entry: Arc>) { + pub(crate) fn move_to_back_ao_in_deque( + deq_name: &str, + deq: &mut Deque>, + entry: &Arc>, + ) { + if let Some(node) = entry.access_order_q_node() { + let p = unsafe { node.as_ref() }; + if &p.region == deq.region() { + if deq.contains(p) { + unsafe { deq.move_to_back(node) }; + } + } else { + panic!( + "move_to_back_ao_in_deque - node is not a member of {} deque. {:?}", + deq_name, p, + ) + } + } + } + + pub(crate) fn move_to_back_wo(&mut self, entry: &Arc>) { use CacheRegion::*; if let Some(node) = entry.write_order_q_node() { let p = unsafe { node.as_ref() }; @@ -79,7 +99,26 @@ impl Deques { } } - pub(crate) fn unlink_ao(&mut self, entry: Arc>) { + pub(crate) fn move_to_back_wo_in_deque( + deq: &mut Deque>, + entry: &Arc>, + ) { + if let Some(node) = entry.write_order_q_node() { + let p = unsafe { node.as_ref() }; + if &p.region == deq.region() { + if deq.contains(p) { + unsafe { deq.move_to_back(node) }; + } + } else { + panic!( + "move_to_back_wo_in_deque - node is not a member of write_order deque. {:?}", + p, + ) + } + } + } + + pub(crate) fn unlink_ao(&mut self, entry: &Arc>) { if let Some(node) = entry.take_access_order_q_node() { self.unlink_node_ao(node); } @@ -88,14 +127,14 @@ impl Deques { pub(crate) fn unlink_ao_from_deque( deq_name: &str, deq: &mut Deque>, - entry: Arc>, + entry: &Arc>, ) { if let Some(node) = entry.take_access_order_q_node() { unsafe { Self::unlink_node_ao_from_deque(deq_name, deq, node) }; } } - pub(crate) fn unlink_wo(deq: &mut Deque>, entry: Arc>) { + pub(crate) fn unlink_wo(deq: &mut Deque>, entry: &Arc>) { if let Some(node) = entry.take_write_order_q_node() { Self::unlink_node_wo(deq, node); } @@ -122,24 +161,26 @@ impl Deques { deq: &mut Deque>, node: NonNull>>, ) { - if deq.contains(node.as_ref()) { - deq.unlink(node); + let p = node.as_ref(); + if &p.region == deq.region() { + if deq.contains(p) { + deq.unlink(node); + } } else { panic!( "unlink_node - node is not a member of {} deque. {:?}", - deq_name, - node.as_ref() + deq_name, p ) } } pub(crate) fn unlink_node_wo(deq: &mut Deque>, node: NonNull>>) { - use CacheRegion::*; unsafe { let p = node.as_ref(); - debug_assert_eq!(&p.region, &WriteOrder); - if deq.contains(p) { - deq.unlink(node); + if &p.region == deq.region() { + if deq.contains(p) { + deq.unlink(node); + } } else { panic!( "unlink_node - node is not a member of write_order deque. {:?}", diff --git a/src/sync/segment.rs b/src/sync/segment.rs index f763fbd1..82238307 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -141,6 +141,12 @@ where self.inner.select(hash).invalidate(key); } + pub fn invalidate_all(&self) { + for segment in self.inner.segments.iter() { + segment.invalidate_all(); + } + } + /// Returns the `max_capacity` of this cache. pub fn max_capacity(&self) -> usize { self.inner.desired_capacity @@ -183,6 +189,23 @@ where } } +// For unit tests. +#[cfg(test)] +impl SegmentedCache +where + K: Hash + Eq, + S: BuildHasher + Clone, +{ + fn reconfigure_for_testing(&mut self) { + let inner = Arc::get_mut(&mut self.inner) + .expect("There are other strong reference to self.inner Arc"); + + for segment in inner.segments.iter_mut() { + segment.reconfigure_for_testing(); + } + } +} + struct Inner { desired_capacity: usize, segments: Box<[Cache]>, @@ -269,8 +292,8 @@ mod tests { #[test] fn basic_single_thread() { - let cache = SegmentedCache::new(3, 1); - // cache.reconfigure_for_testing(); + let mut cache = SegmentedCache::new(3, 1); + cache.reconfigure_for_testing(); // Make the cache exterior immutable. let cache = cache; @@ -317,8 +340,8 @@ mod tests { fn basic_multi_threads() { let num_threads = 4; - let cache = SegmentedCache::new(100, num_threads); - // cache.reconfigure_for_testing(); + let mut cache = SegmentedCache::new(100, num_threads); + cache.reconfigure_for_testing(); // Make the cache exterior immutable. let cache = cache; @@ -343,4 +366,32 @@ mod tests { assert!(cache.get(&10).is_none()); assert!(cache.get(&20).is_some()); } + + #[test] + fn invalidate_all() { + let mut cache = SegmentedCache::new(100, 4); + cache.reconfigure_for_testing(); + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", "alice"); + cache.insert("b", "bob"); + cache.insert("c", "cindy"); + assert_eq!(cache.get(&"a"), Some("alice")); + assert_eq!(cache.get(&"b"), Some("bob")); + assert_eq!(cache.get(&"c"), Some("cindy")); + cache.sync(); + + cache.invalidate_all(); + cache.sync(); + + cache.insert("d", "david"); + cache.sync(); + + assert!(cache.get(&"a").is_none()); + assert!(cache.get(&"b").is_none()); + assert!(cache.get(&"c").is_none()); + assert_eq!(cache.get(&"d"), Some("david")); + } } diff --git a/src/unsync.rs b/src/unsync.rs index 34701d1d..55d2e803 100644 --- a/src/unsync.rs +++ b/src/unsync.rs @@ -1,3 +1,5 @@ +//! Provides a *not* thread-safe, blocking cache implementation. + pub(crate) mod builder; pub(crate) mod cache; mod deques; diff --git a/src/unsync/builder.rs b/src/unsync/builder.rs index 89edd0f2..ef38ef0d 100644 --- a/src/unsync/builder.rs +++ b/src/unsync/builder.rs @@ -47,10 +47,9 @@ pub struct CacheBuilder { impl CacheBuilder> where K: Eq + Hash, - V: Clone, { - /// Construct a new `CacheBuilder` that will be used to build a `Cache` or - /// `SegmentedCache` holding up to `max_capacity` entries. + /// Construct a new `CacheBuilder` that will be used to build a `Cache` holding + /// up to `max_capacity` entries. pub fn new(max_capacity: usize) -> Self { Self { max_capacity, @@ -62,9 +61,6 @@ where } /// Builds a `Cache`. - /// - /// If you want to build a `SegmentedCache`, call `segments` method before - /// calling this method. pub fn build(self) -> Cache { let build_hasher = RandomState::default(); Cache::with_everything( @@ -77,9 +73,6 @@ where } /// Builds a `Cache`, with the given `hasher`. - /// - /// If you want to build a `SegmentedCache`, call `segments` method before - /// calling this method. pub fn build_with_hasher(self, hasher: S) -> Cache where S: BuildHasher + Clone, diff --git a/src/unsync/cache.rs b/src/unsync/cache.rs index b155f9b8..0bd0e6d8 100644 --- a/src/unsync/cache.rs +++ b/src/unsync/cache.rs @@ -17,6 +17,98 @@ use std::{ type CacheStore = std::collections::HashMap, ValueEntry, S>; +/// An in-memory cache that is _not_ thread-safe. +/// +/// `Cache` utilizes a hash table `std::collections::HashMap` from the standard +/// library for the central key-value storage. `Cache` performs a best-effort +/// bounding of the map using an entry replacement algorithm to determine which +/// entries to evict when the capacity is exceeded. +/// +/// # Characteristic difference between `unsync` and `sync`/`future` caches +/// +/// If you use a cache from a single thread application, `unsync::Cache` may +/// outperform other caches for updates and retrievals because other caches have some +/// overhead on syncing internal data structures between threads. +/// +/// However, other caches may outperform `unsync::Cache` on the same operations when +/// expiration polices are configured on a multi-core system. `unsync::Cache` evicts +/// expired entries as a part of update and retrieval operations while others evict +/// them using a dedicated background thread. +/// +/// # Examples +/// +/// Cache entries are manually added using an insert method, and are stored in the +/// cache until either evicted or manually invalidated. +/// +/// Here's an example of reading and updating a cache by using the main thread: +/// +///```rust +/// use moka::unsync::Cache; +/// +/// const NUM_KEYS: usize = 64; +/// +/// fn value(n: usize) -> String { +/// format!("value {}", n) +/// } +/// +/// // Create a cache that can store up to 10,000 entries. +/// let mut cache = Cache::new(10_000); +/// +/// // Insert 64 entries. +/// for key in 0..NUM_KEYS { +/// cache.insert(key, value(key)); +/// } +/// +/// // Invalidate every 4 element of the inserted entries. +/// for key in (0..NUM_KEYS).step_by(4) { +/// cache.invalidate(&key); +/// } +/// +/// // Verify the result. +/// for key in 0..NUM_KEYS { +/// if key % 4 == 0 { +/// assert_eq!(cache.get(&key), None); +/// } else { +/// assert_eq!(cache.get(&key), Some(&value(key))); +/// } +/// } +/// ``` +/// +/// # Expiration Policies +/// +/// `Cache` supports the following expiration policies: +/// +/// - **Time to live**: A cached entry will be expired after the specified duration +/// past from `insert`. +/// - **Time to idle**: A cached entry will be expired after the specified duration +/// past from `get` or `insert`. +/// +/// See the [`CacheBuilder`][builder-struct]'s doc for how to configure a cache +/// with them. +/// +/// [builder-struct]: ./struct.CacheBuilder.html +/// +/// # Hashing Algorithm +/// +/// By default, `Cache` uses a hashing algorithm selected to provide resistance +/// against HashDoS attacks. +/// +/// The default hashing algorithm is the one used by `std::collections::HashMap`, +/// which is currently SipHash 1-3. +/// +/// While its performance is very competitive for medium sized keys, other hashing +/// algorithms will outperform it for small keys such as integers as well as large +/// keys such as long strings. However those algorithms will typically not protect +/// against attacks such as HashDoS. +/// +/// The hashing algorithm can be replaced on a per-`Cache` basis using the +/// [`build_with_hasher`][build-with-hasher-method] method of the +/// `CacheBuilder`. Many alternative algorithms are available on crates.io, such +/// as the [aHash][ahash-crate] crate. +/// +/// [build-with-hasher-method]: ./struct.CacheBuilder.html#method.build_with_hasher +/// [ahash-crate]: https://crates.io/crates/ahash +/// pub struct Cache { max_capacity: usize, cache: CacheStore, @@ -31,8 +123,13 @@ pub struct Cache { impl Cache where K: Hash + Eq, - V: Clone, { + /// Constructs a new `Cache` that will store up to the `max_capacity` entries. + /// + /// To adjust various configuration knobs such as `initial_capacity` or + /// `time_to_live`, use the [`CacheBuilder`][builder-struct]. + /// + /// [builder-struct]: ./struct.CacheBuilder.html pub fn new(max_capacity: usize) -> Self { let build_hasher = RandomState::default(); Self::with_everything(max_capacity, None, build_hasher, None, None) @@ -45,7 +142,6 @@ where impl Cache where K: Hash + Eq, - V: Clone, S: BuildHasher + Clone, { pub(crate) fn with_everything( @@ -73,89 +169,68 @@ where } } + /// Returns an immutable reference of the value corresponding to the key. + /// + /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq` + /// on the borrowed form _must_ match those for the key type. + /// + /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html pub fn get(&mut self, key: &Q) -> Option<&V> where Rc: Borrow, Q: Hash + Eq + ?Sized, { - let hash = self.hash(key); - let has_expiry = self.has_expiry(); - let timestamp = if has_expiry { - Some(self.current_time_from_expiration_clock()) - } else { - None - }; + let timestamp = self.evict_if_needed(); + self.frequency_sketch.increment(self.hash(key)); - if let Some(ts) = timestamp { - self.evict(ts); - } - - let (entry, sketch, deqs) = ( - self.cache.get_mut(key), - &mut self.frequency_sketch, - &mut self.deques, - ); - - match (entry, has_expiry) { + match (self.cache.get_mut(key), timestamp, &mut self.deques) { // Value not found. - (None, _) => { - Self::record_read(sketch, deqs, hash, None, None); - None - } + (None, _, _) => None, // Value found, no expiry. - (Some(entry), false) => { - Self::record_read(sketch, deqs, hash, Some(entry), None); + (Some(entry), None, deqs) => { + Self::record_hit(deqs, entry, None); Some(&entry.value) } - // Value found, need to check if expired. - (Some(entry), true) => { - if Self::is_expired_entry_wo(&self.time_to_live, entry, timestamp.unwrap()) - || Self::is_expired_entry_ao(&self.time_to_idle, entry, timestamp.unwrap()) + // Value found, check if expired. + (Some(entry), Some(ts), deqs) => { + if Self::is_expired_entry_wo(&self.time_to_live, entry, ts) + || Self::is_expired_entry_ao(&self.time_to_idle, entry, ts) { - // Expired entry. Record this access as a cache miss rather than a hit. - Self::record_read(sketch, deqs, hash, None, None); None } else { - // Valid entry. - Self::record_read(sketch, deqs, hash, Some(entry), timestamp); + Self::record_hit(deqs, entry, timestamp); Some(&entry.value) } } } } + /// Inserts a key-value pair into the cache. + /// + /// If the cache has this key present, the value is updated. pub fn insert(&mut self, key: K, value: V) { - let timestamp = if self.has_expiry() { - Some(self.current_time_from_expiration_clock()) - } else { - None - }; - - if let Some(ts) = timestamp { - self.evict(ts); - } - + let timestamp = self.evict_if_needed(); let key = Rc::new(key); let entry = ValueEntry::new(value); if let Some(old_entry) = self.cache.insert(Rc::clone(&key), entry) { self.handle_update(key, timestamp, old_entry); } else { - // Insert let hash = self.hash(&key); self.handle_insert(key, hash, timestamp); } } + /// Discards any cached value for the key. + /// + /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq` + /// on the borrowed form _must_ match those for the key type. pub fn invalidate(&mut self, key: &Q) where Rc: Borrow, Q: Hash + Eq + ?Sized, { - if self.has_expiry() { - let ts = self.current_time_from_expiration_clock(); - self.evict(ts); - } + self.evict_if_needed(); if let Some(mut entry) = self.cache.remove(key) { self.deques.unlink_ao(&mut entry); @@ -163,10 +238,22 @@ where } } + /// Discards all cached values. + /// + /// Like the `invalidate` method, this method does not clear the historic + /// popularity estimator of keys so that it retains the client activities of + /// trying to retrieve an item. + pub fn invalidate_all(&mut self) { + self.cache.clear(); + self.deques.clear(); + } + + /// Returns the `max_capacity` of this cache. pub fn max_capacity(&self) -> usize { self.max_capacity } + /// Returns the `time_to_live` of this cache. pub fn time_to_live(&self) -> Option { self.time_to_live } @@ -183,7 +270,6 @@ where impl Cache where K: Hash + Eq, - V: Clone, S: BuildHasher + Clone, { #[inline] @@ -202,6 +288,17 @@ where self.time_to_live.is_some() || self.time_to_idle.is_some() } + #[inline] + fn evict_if_needed(&mut self) -> Option { + if self.has_expiry() { + let ts = self.current_time_from_expiration_clock(); + self.evict(ts); + Some(ts) + } else { + None + } + } + #[inline] fn current_time_from_expiration_clock(&self) -> Instant { if let Some(clock) = &self.expiration_clock { @@ -218,9 +315,7 @@ where now: Instant, ) -> bool { if let (Some(ts), Some(tti)) = (entry.last_accessed(), time_to_idle) { - if ts + *tti <= now { - return true; - } + return ts + *tti <= now; } false } @@ -232,27 +327,16 @@ where now: Instant, ) -> bool { if let (Some(ts), Some(ttl)) = (entry.last_modified(), time_to_live) { - if ts + *ttl <= now { - return true; - } + return ts + *ttl <= now; } false } - fn record_read( - frequency_sketch: &mut FrequencySketch, - deques: &mut Deques, - hash: u64, - entry: Option<&mut ValueEntry>, - timestamp: Option, - ) { - frequency_sketch.increment(hash); - if let Some(entry) = entry { - if let Some(ts) = timestamp { - entry.set_last_accessed(ts); - } - deques.move_to_back_ao(entry) + fn record_hit(deques: &mut Deques, entry: &mut ValueEntry, ts: Option) { + if let Some(ts) = ts { + entry.set_last_accessed(ts); } + deques.move_to_back_ao(entry) } #[inline] @@ -508,6 +592,27 @@ mod tests { assert_eq!(cache.get(&"b"), None); } + #[test] + fn invalidate_all() { + let mut cache = Cache::new(100); + + cache.insert("a", "alice"); + cache.insert("b", "bob"); + cache.insert("c", "cindy"); + assert_eq!(cache.get(&"a"), Some(&"alice")); + assert_eq!(cache.get(&"b"), Some(&"bob")); + assert_eq!(cache.get(&"c"), Some(&"cindy")); + + cache.invalidate_all(); + + cache.insert("d", "david"); + + assert!(cache.get(&"a").is_none()); + assert!(cache.get(&"b").is_none()); + assert!(cache.get(&"c").is_none()); + assert_eq!(cache.get(&"d"), Some(&"david")); + } + #[test] fn time_to_live() { let mut cache = CacheBuilder::new(100) diff --git a/src/unsync/deques.rs b/src/unsync/deques.rs index 26cdfd76..56d39aee 100644 --- a/src/unsync/deques.rs +++ b/src/unsync/deques.rs @@ -22,6 +22,13 @@ impl Default for Deques { } impl Deques { + pub(crate) fn clear(&mut self) { + self.window = Deque::new(CacheRegion::Window); + self.probation = Deque::new(CacheRegion::MainProbation); + self.protected = Deque::new(CacheRegion::MainProtected); + self.write_order = Deque::new(CacheRegion::WriteOrder); + } + pub(crate) fn push_back_ao( &mut self, region: CacheRegion,