-
-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Show example for get_or_insert_with #29
Comments
Hi. Thanks for your feedback. Yes; the document needs some improvements. I am thinking to add the following examples to
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.5", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;
use std::sync::Arc;
#[tokio::main]
async fn main() {
const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
let cache = Cache::new(1_000);
// Get the value for key1. The async block should be evaluated because
// the key1 does not exist yet.
let value1 = cache
.get_or_insert_with("key1", async { Arc::new(vec![0u8; TEN_MIB]) })
.await;
assert_eq!(value1.len(), TEN_MIB);
// Get the value for key1. The async block should NOT be evaluated
// because the key1 already exists. Note that the length of the vec is
// different to the first call (10MiB vs zero).
let value2 = cache
.get_or_insert_with("key1", async { Arc::new(vec![0u8; 0]) })
.await;
// The length of the value2 should be equal to the one inserted by the
// first call.
assert_eq!(value2.len(), TEN_MIB);
}
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.5", features = ["future"] }
// reqwest = "0.11"
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;
/// This async function tries to get HTML from the given URI.
async fn get_html(uri: &str) ->
Result<String, Box<dyn std::error::Error + Send + Sync + 'static>>
{
Ok(reqwest::get(uri).await?.text().await?)
}
#[tokio::main]
async fn main() {
let cache = Cache::new(1_000);
// Get the value for key1. The async fn should be evaluated because the
// key does not exist yet. However, the async fn will return an error due
// to the broken URL.
let value1 = cache
.get_or_try_insert_with("key1", get_html("tps://broken-url"))
.await;
// An error was returned, and key1 still does not exist.
assert!(value1.is_err());
assert!(cache.get(&"key1").is_none());
// Get the value for key1. The async fn should be evaluated.
let value2 = cache
.get_or_try_insert_with("key1", get_html("https://www.rust-lang.org"))
.await;
// An OK was returned, and key1 now exists.
assert!(value2.is_ok());
assert!(cache.get(&"key1").is_some());
} |
Appreciate your quick and very detailed response, I was mistaken about what I thought this method did. This ticket probably isn't the place but I would like to mention: I think a useful feature could be to allow locking a key. My use case is a function that can run multiple times in circumstances where network conditions are bad, and I wanted to block at get() until the other functions that intend to insert() finish. I imagine it could look something like this: my_cache.acquire_lock(&"my_key");
let cached_item: String = match my_cache.get(&"my_key")
{
None => {
... lookup some data from a database and then insert it into the moka cache, once .get() returns unlock the mutex ...
value_from_database
}
Some(v) => v,
};
I just think it would be a nice feature to have since this is a frequent pattern for me with this library. I'm not sure if that is something you are interested in adding. Thanks so much for your work on this project! |
Ah, OK. You were not mistaken. This method does lock a key and you can use it in your code above. It uses a moka/src/future/value_initializer.rs Line 11 in 8607db7
moka/src/future/value_initializer.rs Lines 46 to 65 in 8607db7
(If the async task has acquired the writer lock, Caches have unit test cases to verify this behavior, but they will not be suitable for usage examples (They will be too complicated). Lines 943 to 947 in 8607db7
Let me try to come up with another usage example to demonstrate the locking. I will do it after work. (It is early morning now in my timezone UTC+0800, and I will do it tonight) |
So your code my_cache.acquire_lock(&"my_key");
let cached_item: String = match my_cache.get(&"my_key")
{
None => {
... lookup some data from a database and then insert it into the moka cache, once .get() returns unlock the mutex ...
value_from_database
}
Some(v) => v,
}; will be translated into this when you use let cached_item: String = my_cache.get_or_insert_with("my_key",
// This async block should be evaluated only once.
// get_or_insert_with() uses an internal mutex (`RwLock`) on "my_key",
// so you do not need something like `my_cache.acquire_lock(&"my_key")`.
async {
// Lookup some data from a database.
let value_from_database = ...
// Then return the data. It will be inserted into my_cache.
value_from_database
// Once it is inserted, the mutex is unlocked, and other async tasks,
// which have called `get_or_insert_with()` and blocked, will be unblocked
// and get the same data.
}
).await; |
OK. How about this? Will this answer your question? // Cargo.toml
//
// [dependencies]
// moka = { version = "0.5", features = ["future"] }
// futures = "0.3"
// once_cell = "1"
// reqwest = "0.11"
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;
use once_cell::sync::Lazy;
use std::sync::atomic::{AtomicU8, Ordering};
// This counter will be incremented by 1 on each time get_html() is called.
// (This counter exists only for a demonstration purpose)
static CALL_COUNTER: Lazy<AtomicU8> = Lazy::new(|| AtomicU8::default());
/// This async function tries to get HTML from the given URI.
async fn get_html(
task_id: u8,
uri: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync + 'static>> {
println!("get_html() called by task {}.", task_id);
// Increment the call counter by 1.
CALL_COUNTER.fetch_add(1, Ordering::AcqRel);
// Get the HTML from the Internet.
Ok(reqwest::get(uri).await?.text().await?)
}
#[tokio::main]
async fn main() {
let cache = Cache::new(1_000);
// Spawn eight async tasks. They will try to do the same thing at the same time
// but get_html() should be called only once.
let tasks: Vec<_> = (0..8_u8)
.map(|task_id| {
// To share the same cache across the async tasks, clone it.
// This is a cheap operation.
let my_cache = cache.clone();
// Spawn an async task.
tokio::spawn(async move {
println!("Task {} started.", task_id);
// Try to get the value for key1. If key1 does not exists, get_html()
// will be called, and the returned value will be inserted to the cache
// unless it is an Err(..).
let value = my_cache
.get_or_try_insert_with("key1", get_html(task_id, "https://www.rust-lang.org"))
.await;
// Ensure the value exists now.
assert!(value.is_ok());
assert!(my_cache.get(&"key1").is_some());
println!(
"Task {} got the value. (len: {})",
task_id,
value.unwrap().len()
);
})
})
.collect();
// Run all tasks concurrently and wait for them to complete.
futures::future::join_all(tasks).await;
// Verify that get_html() was called exactly once.
assert_eq!(CALL_COUNTER.load(Ordering::Acquire), 1);
} Here is an output from the above program:
Task 0 started.
Task 3 started.
Task 4 started.
Task 5 started.
Task 1 started.
Task 2 started.
get_html() called by task 5.
Task 6 started.
Task 7 started.
Task 5 got the value. (len: 19419)
Task 1 got the value. (len: 19419)
Task 7 got the value. (len: 19419)
Task 4 got the value. (len: 19419)
Task 2 got the value. (len: 19419)
Task 6 got the value. (len: 19419)
Task 3 got the value. (len: 19419)
Task 0 got the value. (len: 19419) This example uses |
Ah, I see now. That example is beautiful. Thank you! |
Sorry, but I'm still having trouble. I believe it's because I can't return the way I want to here with bail!() inside an async block. Perhaps you know of a solution?: let mut member_was_cached = true;
let cached_member: GuildMember = MEMBERS_MEM_CACHE
.get()
.unwrap()
.get_or_insert_with((member.guild_id.0, member.user.id.0), async {
match query_db_for_member_or_return_current(&member).await {
Ok(v) => {
if !v.1 { member_was_cached = false; };
v.0
}
Err(e) => bail!(e)
}
}
)
.await; I return a I did try the get_or_try_insert_with variant also but seem to be having the same problem. |
Yeah, I know these type puzzles will be difficult to solve for anybody until they get used to async and multi-threading programming in Rust. I see a couple of issues in your code. Added comments inline. let mut member_was_cached = true;
let cached_member: GuildMember = MEMBERS_MEM_CACHE
.get()
.unwrap()
.get_or_insert_with((member.guild_id.0, member.user.id.0), async {
match query_db_for_member_or_return_current(&member).await {
Ok(v) => {
// We cannot safely write to this variable here because
// this async block can be executed by a different thread.
// We need to use AtomicBool instead.
if !v.1 { member_was_cached = false; };
v.0
}
// bail!() only do an early return from this async block, so
// it will return the Err value to its caller get_or_insert_with().
// We need to use get_or_try_insert_with() instead.
Err(e) => bail!(e)
}
}
)
.await; To solve them, we need to do the followings:
Here is an example code with the above changes applied: use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use moka::future::Cache;
use once_cell::sync::Lazy;
#[derive(Clone)]
struct GuildMember {
guild_id: (u32, String),
user_id: (u32, String),
}
async fn query_db_for_member_or_return_current(
_member: &GuildMember,
) -> Result<(GuildMember, bool), anyhow::Error> {
todo!()
}
async fn get_user(member: GuildMember) -> anyhow::Result<GuildMember> {
static MEMBERS_MEM_CACHE: Lazy<Cache<(u32, u32), GuildMember>> =
Lazy::new(|| Cache::new(1024));
// The async block passed to get_or_try_insert_with() can be executed by a
// different thread. So we need to protect the bool value from multi-threaded
// access using an AtomicBool.
let member_was_cached = Arc::new(AtomicBool::new(true));
// Create an Arc clone. We will move this into the async block.
let mwc = Arc::clone(&member_was_cached);
let cached_member = MEMBERS_MEM_CACHE
// The return type of our async block is Result<GuildMember, anyhow::Error>, so we
// need to use get_or_try_insert_with() instead of get_or_insert_with().
.get_or_try_insert_with((member.guild_id.0, member.user_id.0), async move {
match query_db_for_member_or_return_current(&member).await {
Ok(v) => {
if !v.1 {
mwc.store(false, Ordering::Release)
};
Ok(v.0)
}
Err(e) => {
// Convert e (anyhow::Error) into a type that get_or_try_insert_with()
// can accept.
Err(e.into())
}
}
})
.await
// We need to wrap the error with anyhow::Error. Upcoming Moka v0.6 will no longer
// require this map_err() method.
.map_err(|e| anyhow::anyhow!(e))?;
// Read the bool value and do something with it.
if !member_was_cached.load(Ordering::Acquire) {
todo!();
}
Ok(cached_member)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let member = GuildMember {
guild_id: (0, "guild0".into()),
user_id: (1, "user1".into()),
};
get_user(member).await?;
Ok(())
} Actually, I realized that Moka v0.5.0 and v0.5.1 have a bug in type definitions so they will accept code without the change 1 ( |
Your helpfulness is legend! Thank you. Closing this issue now. |
I'm having trouble figuring out how to use that method. Can you add an example to the docs?
The text was updated successfully, but these errors were encountered: