diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index c1400c8f2c..d868823df7 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -29,7 +29,7 @@ use futures::stream::BoxStream; use hash_ext::SeaHasherBuilder; use log::{debug, error, info, warn}; use lru::LruCache; -use notifier::notifier::RequestNotifiers; +use notifier::notifier::{ExecutionGuard, RequestNotifiers}; use partitioned_lock::PartitionedMutex; use serde::{Deserialize, Serialize}; use snafu::{ensure, Backtrace, ResultExt, Snafu}; @@ -642,19 +642,46 @@ impl DiskCacheStore { rxs.push(rx); } - let fetched_bytes = self - .underlying_store - .get_ranges(location, &need_fetch_block[..]) - .await; + if need_fetch_block.is_empty() { + // All ranges are not first request, return directly. + return Ok(rxs); + } + + let fetched_bytes = { + // This guard will ensure notifiers got released when futures get cancelled + // during `get_ranges`. + let mut guard = ExecutionGuard::new(|| { + for cache_key in &need_fetch_block_cache_key { + let _ = self.request_notifiers.take_notifiers(cache_key); + } + }); + + let bytes = self + .underlying_store + .get_ranges(location, &need_fetch_block) + .await; + + guard.cancel(); + bytes + }; + + // Take all cache_key's notifiers out from request_notifiers immediately. + let notifiers_vec: Vec<_> = need_fetch_block_cache_key + .iter() + .map(|cache_key| self.request_notifiers.take_notifiers(cache_key).unwrap()) + .collect(); + let fetched_bytes = match fetched_bytes { Err(err) => { - for cache_key in need_fetch_block_cache_key { - let notifiers = self.request_notifiers.take_notifiers(&cache_key).unwrap(); + for notifiers in notifiers_vec { for notifier in notifiers { - if let Err(e) = notifier.send(Err(Error::WaitNotifier { - message: err.to_string(), - })) { - error!("Failed to send notifier error result, err:{:?}.", e); + if let Err(e) = notifier.send( + WaitNotifier { + message: err.to_string(), + } + .fail(), + ) { + error!("Failed to send notifier error result, err:{e:?}."); } } } @@ -664,15 +691,15 @@ impl DiskCacheStore { Ok(v) => v, }; - for (bytes, cache_key) in fetched_bytes + for ((bytes, notifiers), cache_key) in fetched_bytes .into_iter() + .zip(notifiers_vec.into_iter()) .zip(need_fetch_block_cache_key.into_iter()) { - let notifiers = self.request_notifiers.take_notifiers(&cache_key).unwrap(); self.cache.insert_data(cache_key, bytes.clone()).await; for notifier in notifiers { if let Err(e) = notifier.send(Ok(bytes.clone())) { - error!("Failed to send notifier success result, err:{:?}.", e); + error!("Failed to send notifier success result, err:{e:?}."); } } }