Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update disk cache in another thread to avoid blocking normal query process #1431

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: update disk cache in another thread to avoid blocking normal qu…
…ery process
jiacai2050 committed Jan 8, 2024
commit 56331b5e92fa39cefa339f66b54744fd8225ad9f
1 change: 1 addition & 0 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
@@ -241,6 +241,7 @@ fn open_storage(
opts.disk_cache_page_size.as_byte() as usize,
store,
opts.disk_cache_partition_bits,
engine_runtimes.io_runtime.clone(),
)
.await
.context(OpenObjectStore)?,
80 changes: 64 additions & 16 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ use logger::{debug, warn};
use lru::LruCache;
use notifier::notifier::{ExecutionGuard, RequestNotifiers};
use partitioned_lock::PartitionedMutex;
use runtime::Runtime;
use serde::{Deserialize, Serialize};
use snafu::{ensure, Backtrace, ResultExt, Snafu};
use time_ext;
@@ -261,10 +262,10 @@ struct PageMeta {
// TODO: Introduce the CRC for integration check.
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct DiskCache {
root_dir: String,
meta_cache: PartitionedMutex<PageMetaCache, SeaHasherBuilder>,
meta_cache: Arc<PartitionedMutex<PageMetaCache, SeaHasherBuilder>>,
}

#[derive(Debug, Clone)]
@@ -292,7 +293,11 @@ impl DiskCache {

Ok(Self {
root_dir,
meta_cache: PartitionedMutex::try_new(init_lru, partition_bits, SeaHasherBuilder {})?,
meta_cache: Arc::new(PartitionedMutex::try_new(
init_lru,
partition_bits,
SeaHasherBuilder {},
)?),
})
}

@@ -509,6 +514,7 @@ pub struct DiskCacheStore {
meta_cache: PartitionedMutex<LruCache<Path, FileMeta>, SeaHasherBuilder>,
underlying_store: Arc<dyn ObjectStore>,
request_notifiers: Arc<RequestNotifiers<String, oneshot::Sender<StdResult<Bytes, Error>>>>,
runtime: Arc<Runtime>,
}

impl DiskCacheStore {
@@ -518,6 +524,7 @@ impl DiskCacheStore {
page_size: usize,
underlying_store: Arc<dyn ObjectStore>,
partition_bits: usize,
runtime: Arc<Runtime>,
) -> Result<Self> {
let page_num = cap / page_size;
ensure!(page_num != 0, InvalidCapacity);
@@ -550,6 +557,7 @@ impl DiskCacheStore {
meta_cache,
underlying_store,
request_notifiers,
runtime,
})
}

@@ -744,7 +752,12 @@ impl DiskCacheStore {
.zip(notifiers_vec.into_iter())
.zip(need_fetch_block_cache_key.into_iter())
{
self.cache.insert_data(cache_key, bytes.clone()).await;
{
let cache = self.cache.clone();
let bytes = bytes.clone();
self.runtime
.spawn(async move { cache.insert_data(cache_key, bytes).await });
}
for notifier in notifiers {
if notifier.send(Ok(bytes.clone())).is_err() {
// The error contains sent bytes, which maybe very large,
@@ -992,6 +1005,7 @@ impl ObjectStore for DiskCacheStore {

#[cfg(test)]
mod test {
use runtime::Builder;
use tempfile::{tempdir, TempDir};
use upstream::local::LocalFileSystem;

@@ -1011,12 +1025,14 @@ mod test {
let local_store = Arc::new(MemoryStore::default());

let cache_dir = tempdir().unwrap();
let runtime = Arc::new(Builder::default().build().unwrap());
let store = DiskCacheStore::try_new(
cache_dir.as_ref().to_string_lossy().to_string(),
cap,
page_size,
local_store,
partition_bits,
runtime,
)
.await
.unwrap();
@@ -1264,14 +1280,22 @@ mod test {
let cache_dir = tempdir().unwrap();
let cache_root_dir = cache_dir.as_ref().to_string_lossy().to_string();
let page_size = 8;
let runtime = Arc::new(Builder::default().build().unwrap());
let first_create_time = {
let _store = {
let local_path = tempdir().unwrap();
let local_store =
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
DiskCacheStore::try_new(cache_root_dir.clone(), 160, 8, local_store, 0)
.await
.unwrap()
DiskCacheStore::try_new(
cache_root_dir.clone(),
160,
8,
local_store,
0,
runtime.clone(),
)
.await
.unwrap()
};
let manifest =
DiskCacheStore::create_manifest_if_not_exists(&cache_root_dir, page_size)
@@ -1289,9 +1313,16 @@ mod test {
let local_path = tempdir().unwrap();
let local_store =
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
DiskCacheStore::try_new(cache_root_dir.clone(), 160, 8, local_store, 0)
.await
.unwrap()
DiskCacheStore::try_new(
cache_root_dir.clone(),
160,
8,
local_store,
0,
runtime.clone(),
)
.await
.unwrap()
};

let manifest =
@@ -1314,6 +1345,7 @@ mod test {
page_size * 2,
local_store,
0,
runtime,
)
.await;

@@ -1327,14 +1359,23 @@ mod test {
let cache_root_dir = cache_dir.as_ref().to_string_lossy().to_string();
let page_size = 16;
let location = Path::from("recovery.sst");
let runtime = Arc::new(Builder::default().build().unwrap());

{
let store = {
let local_path = tempdir().unwrap();
let local_store =
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
DiskCacheStore::try_new(cache_root_dir.clone(), 10240, page_size, local_store, 0)
.await
.unwrap()
DiskCacheStore::try_new(
cache_root_dir.clone(),
10240,
page_size,
local_store,
0,
runtime.clone(),
)
.await
.unwrap()
};
let data = b"abcd";
let mut buf = BytesMut::with_capacity(data.len() * 1024);
@@ -1358,9 +1399,16 @@ mod test {
let local_path = tempdir().unwrap();
let local_store =
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
DiskCacheStore::try_new(cache_root_dir.clone(), 160, page_size, local_store, 0)
.await
.unwrap()
DiskCacheStore::try_new(
cache_root_dir.clone(),
160,
page_size,
local_store,
0,
runtime,
)
.await
.unwrap()
};
for range in [16..32, 32..48, 48..64, 64..80, 80..96, 96..112] {
let filename = DiskCacheStore::page_cache_name(&location, &range);