From 56331b5e92fa39cefa339f66b54744fd8225ad9f Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 8 Jan 2024 19:02:07 +0800 Subject: [PATCH 1/2] feat: update disk cache in another thread to avoid blocking normal query process --- analytic_engine/src/setup.rs | 1 + components/object_store/src/disk_cache.rs | 80 ++++++++++++++++++----- 2 files changed, 65 insertions(+), 16 deletions(-) diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index 371fadc33a..1c55c92565 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -241,6 +241,7 @@ fn open_storage( opts.disk_cache_page_size.as_byte() as usize, store, opts.disk_cache_partition_bits, + engine_runtimes.io_runtime.clone(), ) .await .context(OpenObjectStore)?, diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index 53d537ffa6..6f9a9201a3 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -31,6 +31,7 @@ use logger::{debug, warn}; use lru::LruCache; use notifier::notifier::{ExecutionGuard, RequestNotifiers}; use partitioned_lock::PartitionedMutex; +use runtime::Runtime; use serde::{Deserialize, Serialize}; use snafu::{ensure, Backtrace, ResultExt, Snafu}; use time_ext; @@ -261,10 +262,10 @@ struct PageMeta { // TODO: Introduce the CRC for integration check. } -#[derive(Debug)] +#[derive(Debug, Clone)] struct DiskCache { root_dir: String, - meta_cache: PartitionedMutex, + meta_cache: Arc>, } #[derive(Debug, Clone)] @@ -292,7 +293,11 @@ impl DiskCache { Ok(Self { root_dir, - meta_cache: PartitionedMutex::try_new(init_lru, partition_bits, SeaHasherBuilder {})?, + meta_cache: Arc::new(PartitionedMutex::try_new( + init_lru, + partition_bits, + SeaHasherBuilder {}, + )?), }) } @@ -509,6 +514,7 @@ pub struct DiskCacheStore { meta_cache: PartitionedMutex, SeaHasherBuilder>, underlying_store: Arc, request_notifiers: Arc>>>, + runtime: Arc, } impl DiskCacheStore { @@ -518,6 +524,7 @@ impl DiskCacheStore { page_size: usize, underlying_store: Arc, partition_bits: usize, + runtime: Arc, ) -> Result { let page_num = cap / page_size; ensure!(page_num != 0, InvalidCapacity); @@ -550,6 +557,7 @@ impl DiskCacheStore { meta_cache, underlying_store, request_notifiers, + runtime, }) } @@ -744,7 +752,12 @@ impl DiskCacheStore { .zip(notifiers_vec.into_iter()) .zip(need_fetch_block_cache_key.into_iter()) { - self.cache.insert_data(cache_key, bytes.clone()).await; + { + let cache = self.cache.clone(); + let bytes = bytes.clone(); + self.runtime + .spawn(async move { cache.insert_data(cache_key, bytes).await }); + } for notifier in notifiers { if notifier.send(Ok(bytes.clone())).is_err() { // The error contains sent bytes, which maybe very large, @@ -992,6 +1005,7 @@ impl ObjectStore for DiskCacheStore { #[cfg(test)] mod test { + use runtime::Builder; use tempfile::{tempdir, TempDir}; use upstream::local::LocalFileSystem; @@ -1011,12 +1025,14 @@ mod test { let local_store = Arc::new(MemoryStore::default()); let cache_dir = tempdir().unwrap(); + let runtime = Arc::new(Builder::default().build().unwrap()); let store = DiskCacheStore::try_new( cache_dir.as_ref().to_string_lossy().to_string(), cap, page_size, local_store, partition_bits, + runtime, ) .await .unwrap(); @@ -1264,14 +1280,22 @@ mod test { let cache_dir = tempdir().unwrap(); let cache_root_dir = cache_dir.as_ref().to_string_lossy().to_string(); let page_size = 8; + let runtime = Arc::new(Builder::default().build().unwrap()); let first_create_time = { let _store = { let local_path = tempdir().unwrap(); let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - DiskCacheStore::try_new(cache_root_dir.clone(), 160, 8, local_store, 0) - .await - .unwrap() + DiskCacheStore::try_new( + cache_root_dir.clone(), + 160, + 8, + local_store, + 0, + runtime.clone(), + ) + .await + .unwrap() }; let manifest = DiskCacheStore::create_manifest_if_not_exists(&cache_root_dir, page_size) @@ -1289,9 +1313,16 @@ mod test { let local_path = tempdir().unwrap(); let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - DiskCacheStore::try_new(cache_root_dir.clone(), 160, 8, local_store, 0) - .await - .unwrap() + DiskCacheStore::try_new( + cache_root_dir.clone(), + 160, + 8, + local_store, + 0, + runtime.clone(), + ) + .await + .unwrap() }; let manifest = @@ -1314,6 +1345,7 @@ mod test { page_size * 2, local_store, 0, + runtime, ) .await; @@ -1327,14 +1359,23 @@ mod test { let cache_root_dir = cache_dir.as_ref().to_string_lossy().to_string(); let page_size = 16; let location = Path::from("recovery.sst"); + let runtime = Arc::new(Builder::default().build().unwrap()); + { let store = { let local_path = tempdir().unwrap(); let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - DiskCacheStore::try_new(cache_root_dir.clone(), 10240, page_size, local_store, 0) - .await - .unwrap() + DiskCacheStore::try_new( + cache_root_dir.clone(), + 10240, + page_size, + local_store, + 0, + runtime.clone(), + ) + .await + .unwrap() }; let data = b"abcd"; let mut buf = BytesMut::with_capacity(data.len() * 1024); @@ -1358,9 +1399,16 @@ mod test { let local_path = tempdir().unwrap(); let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - DiskCacheStore::try_new(cache_root_dir.clone(), 160, page_size, local_store, 0) - .await - .unwrap() + DiskCacheStore::try_new( + cache_root_dir.clone(), + 160, + page_size, + local_store, + 0, + runtime, + ) + .await + .unwrap() }; for range in [16..32, 32..48, 48..64, 64..80, 80..96, 96..112] { let filename = DiskCacheStore::page_cache_name(&location, &range); From 73b67a90b7e64ef1786ecb681b17a724d87a4a7d Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 9 Jan 2024 14:52:29 +0800 Subject: [PATCH 2/2] fix ci test --- components/object_store/src/disk_cache.rs | 723 +++++++++++----------- 1 file changed, 375 insertions(+), 348 deletions(-) diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index 6f9a9201a3..937181a69b 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -31,7 +31,7 @@ use logger::{debug, warn}; use lru::LruCache; use notifier::notifier::{ExecutionGuard, RequestNotifiers}; use partitioned_lock::PartitionedMutex; -use runtime::Runtime; +use runtime::RuntimeRef; use serde::{Deserialize, Serialize}; use snafu::{ensure, Backtrace, ResultExt, Snafu}; use time_ext; @@ -514,7 +514,7 @@ pub struct DiskCacheStore { meta_cache: PartitionedMutex, SeaHasherBuilder>, underlying_store: Arc, request_notifiers: Arc>>>, - runtime: Arc, + runtime: RuntimeRef, } impl DiskCacheStore { @@ -524,7 +524,7 @@ impl DiskCacheStore { page_size: usize, underlying_store: Arc, partition_bits: usize, - runtime: Arc, + runtime: RuntimeRef, ) -> Result { let page_num = cap / page_size; ensure!(page_num != 0, InvalidCapacity); @@ -755,8 +755,13 @@ impl DiskCacheStore { { let cache = self.cache.clone(); let bytes = bytes.clone(); - self.runtime + let handle = self + .runtime .spawn(async move { cache.insert_data(cache_key, bytes).await }); + // In test, wait the handle to finish, otherwise the test may fail. + if cfg!(test) { + let _ = handle.await; + } } for notifier in notifiers { if notifier.send(Ok(bytes.clone())).is_err() { @@ -1005,7 +1010,7 @@ impl ObjectStore for DiskCacheStore { #[cfg(test)] mod test { - use runtime::Builder; + use runtime::{Builder, RuntimeRef}; use tempfile::{tempdir, TempDir}; use upstream::local::LocalFileSystem; @@ -1021,11 +1026,11 @@ mod test { page_size: usize, cap: usize, partition_bits: usize, + runtime: RuntimeRef, ) -> StoreWithCacheDir { let local_store = Arc::new(MemoryStore::default()); let cache_dir = tempdir().unwrap(); - let runtime = Arc::new(Builder::default().build().unwrap()); let store = DiskCacheStore::try_new( cache_dir.as_ref().to_string_lossy().to_string(), cap, @@ -1050,373 +1055,392 @@ mod test { .exists() } - #[tokio::test] - async fn test_disk_cache_out_of_range() { - let page_size = 16; - // 51 byte - let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; - let location = Path::from("out_of_range_test.sst"); - let store = prepare_store(page_size, 32, 0).await; - let buf = Bytes::from_static(data); - store.inner.put(&location, buf.clone()).await.unwrap(); - - // Read one page out of range. - let res = store.inner.get_range(&location, 48..54).await; - assert!(res.is_err()); - - // Read multiple pages out of range. - let res = store.inner.get_range(&location, 24..54).await; - assert!(res.is_err()); + #[test] + fn test_disk_cache_out_of_range() { + let rt = Arc::new(Builder::default().build().unwrap()); + rt.block_on(async { + let page_size = 16; + // 51 byte + let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; + let location = Path::from("out_of_range_test.sst"); + let store = prepare_store(page_size, 32, 0, rt.clone()).await; + let buf = Bytes::from_static(data); + store.inner.put(&location, buf.clone()).await.unwrap(); + + // Read one page out of range. + let res = store.inner.get_range(&location, 48..54).await; + assert!(res.is_err()); + + // Read multiple pages out of range. + let res = store.inner.get_range(&location, 24..54).await; + assert!(res.is_err()); + }); } - #[tokio::test] - async fn test_disk_cache_store_get_range() { - let page_size = 16; - // 51 byte - let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; - let location = Path::from("1.sst"); - let store = prepare_store(page_size, 1024, 0).await; - - let mut buf = BytesMut::with_capacity(data.len() * 4); - // extend 4 times, then location will contain 200 bytes - for _ in 0..4 { - buf.extend_from_slice(data); - } - store.inner.put(&location, buf.freeze()).await.unwrap(); - - let testcases = vec![ - (0..6, "a b c "), - (0..16, "a b c d e f g h "), - // len of aligned ranges will be 2 - (0..17, "a b c d e f g h i"), - (16..17, "i"), - // len of aligned ranges will be 6 - (16..100, "i j k l m n o p q r s t u v w x y za b c d e f g h i j k l m n o p q r s t u v w x y"), - ]; - - for (input, expected) in testcases { - assert_eq!( - store.inner.get_range(&location, input).await.unwrap(), - Bytes::copy_from_slice(expected.as_bytes()) - ); - } - - // remove cached values, then get again - { - for range in [0..16, 16..32, 32..48, 48..64, 64..80, 80..96, 96..112] { - let data_cache = store - .inner - .cache - .meta_cache - .lock(&DiskCacheStore::page_cache_name(&location, &range).as_str()); - assert!(data_cache - .contains(DiskCacheStore::page_cache_name(&location, &range).as_str())); - assert!(test_file_exists(&store.cache_dir, &location, &range)); + #[test] + fn test_disk_cache_store_get_range() { + let rt = Arc::new(Builder::default().build().unwrap()); + rt.block_on(async { + let page_size = 16; + // 51 byte + let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; + let location = Path::from("1.sst"); + let store = prepare_store(page_size, 1024, 0, rt.clone()).await; + + let mut buf = BytesMut::with_capacity(data.len() * 4); + // extend 4 times, then location will contain 200 bytes + for _ in 0..4 { + buf.extend_from_slice(data); } + store.inner.put(&location, buf.freeze()).await.unwrap(); + + let testcases = vec![ + (0..6, "a b c "), + (0..16, "a b c d e f g h "), + // len of aligned ranges will be 2 + (0..17, "a b c d e f g h i"), + (16..17, "i"), + // len of aligned ranges will be 6 + (16..100, "i j k l m n o p q r s t u v w x y za b c d e f g h i j k l m n o p q r s t u v w x y"), + ]; + + for (input, expected) in testcases { + assert_eq!( + store.inner.get_range(&location, input).await.unwrap(), + Bytes::copy_from_slice(expected.as_bytes()) + ); + } + + // remove cached values, then get again + { + for range in [0..16, 16..32, 32..48, 48..64, 64..80, 80..96, 96..112] { + let data_cache = store + .inner + .cache + .meta_cache + .lock(&DiskCacheStore::page_cache_name(&location, &range).as_str()); + assert!(data_cache + .contains(DiskCacheStore::page_cache_name(&location, &range).as_str())); + assert!(test_file_exists(&store.cache_dir, &location, &range)); + } - for range in [16..32, 48..64, 80..96] { - let mut data_cache = store - .inner - .cache - .meta_cache - .lock(&DiskCacheStore::page_cache_name(&location, &range).as_str()); - assert!(data_cache - .pop(&DiskCacheStore::page_cache_name(&location, &range)) - .is_some()); + for range in [16..32, 48..64, 80..96] { + let mut data_cache = store + .inner + .cache + .meta_cache + .lock(&DiskCacheStore::page_cache_name(&location, &range).as_str()); + assert!(data_cache + .pop(&DiskCacheStore::page_cache_name(&location, &range)) + .is_some()); + } } - } - assert_eq!( - store.inner.get_range(&location, 16..100).await.unwrap(), - Bytes::copy_from_slice( - b"i j k l m n o p q r s t u v w x y za b c d e f g h i j k l m n o p q r s t u v w x y" - ) - ); + assert_eq!( + store.inner.get_range(&location, 16..100).await.unwrap(), + Bytes::copy_from_slice( + b"i j k l m n o p q r s t u v w x y za b c d e f g h i j k l m n o p q r s t u v w x y" + ) + ); + + }); } - #[tokio::test] - async fn test_disk_cache_multi_thread_fetch_same_block() { - let page_size = 16; - // 51 byte - let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; - let location = Path::from("1.sst"); - let store = Arc::new(prepare_store(page_size, 32, 0).await); - - let mut buf = BytesMut::with_capacity(data.len() * 4); - // extend 4 times, then location will contain 200 bytes - for _ in 0..4 { - buf.extend_from_slice(data); - } - store.inner.put(&location, buf.freeze()).await.unwrap(); - - let testcases = vec![ - (0..6, "a b c "), - (0..16, "a b c d e f g h "), - (0..17, "a b c d e f g h i"), - (16..17, "i"), - (16..100, "i j k l m n o p q r s t u v w x y za b c d e f g h i j k l m n o p q r s t u v w x y"), - ]; - let testcases = testcases - .iter() - .cycle() - .take(testcases.len() * 100) - .cloned() - .collect::>(); - - let mut tasks = Vec::with_capacity(testcases.len()); - for (input, _) in &testcases { - let store = store.clone(); - let location = location.clone(); - let input = input.clone(); - - tasks.push(tokio::spawn(async move { - store.inner.get_range(&location, input).await.unwrap() - })); - } + #[test] + fn test_disk_cache_multi_thread_fetch_same_block() { + let rt = Arc::new(Builder::default().build().unwrap()); + rt.block_on(async { + let page_size = 16; + // 51 byte + let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; + let location = Path::from("1.sst"); + let store = Arc::new(prepare_store(page_size, 32, 0,rt.clone()).await); + + let mut buf = BytesMut::with_capacity(data.len() * 4); + // extend 4 times, then location will contain 200 bytes + for _ in 0..4 { + buf.extend_from_slice(data); + } + store.inner.put(&location, buf.freeze()).await.unwrap(); + + let testcases = vec![ + (0..6, "a b c "), + (0..16, "a b c d e f g h "), + (0..17, "a b c d e f g h i"), + (16..17, "i"), + (16..100, "i j k l m n o p q r s t u v w x y za b c d e f g h i j k l m n o p q r s t u v w x y"), + ]; + let testcases = testcases + .iter() + .cycle() + .take(testcases.len() * 100) + .cloned() + .collect::>(); + + let mut tasks = Vec::with_capacity(testcases.len()); + for (input, _) in &testcases { + let store = store.clone(); + let location = location.clone(); + let input = input.clone(); + + tasks.push(tokio::spawn(async move { + store.inner.get_range(&location, input).await.unwrap() + })); + } - let actual = futures::future::join_all(tasks).await; - for (actual, (_, expected)) in actual.into_iter().zip(testcases.into_iter()) { - assert_eq!(actual.unwrap(), Bytes::from(expected)) - } + let actual = futures::future::join_all(tasks).await; + for (actual, (_, expected)) in actual.into_iter().zip(testcases.into_iter()) { + assert_eq!(actual.unwrap(), Bytes::from(expected)) + } + }); } - #[tokio::test] - async fn test_disk_cache_remove_cache_file() { - let page_size = 16; - // 51 byte - let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; - let location = Path::from("remove_cache_file.sst"); - let store = prepare_store(page_size, 32, 0).await; - let mut buf = BytesMut::with_capacity(data.len() * 4); - // extend 4 times, then location will contain 200 bytes, but cache cap is 32 - for _ in 0..4 { - buf.extend_from_slice(data); - } - store.inner.put(&location, buf.freeze()).await.unwrap(); - - let _ = store.inner.get_range(&location, 0..16).await.unwrap(); - let _ = store.inner.get_range(&location, 16..32).await.unwrap(); - // cache is full now - assert!(test_file_exists(&store.cache_dir, &location, &(0..16))); - assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); - - // insert new cache, evict oldest entry - let _ = store.inner.get_range(&location, 32..48).await.unwrap(); - assert!(!test_file_exists(&store.cache_dir, &location, &(0..16))); - assert!(test_file_exists(&store.cache_dir, &location, &(32..48))); - - // insert new cache, evict oldest entry - let _ = store.inner.get_range(&location, 48..64).await.unwrap(); - assert!(!test_file_exists(&store.cache_dir, &location, &(16..32))); - assert!(test_file_exists(&store.cache_dir, &location, &(48..64))); + #[test] + fn test_disk_cache_remove_cache_file() { + let rt = Arc::new(Builder::default().build().unwrap()); + rt.block_on(async { + let page_size = 16; + // 51 byte + let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; + let location = Path::from("remove_cache_file.sst"); + let store = prepare_store(page_size, 32, 0, rt.clone()).await; + let mut buf = BytesMut::with_capacity(data.len() * 4); + // extend 4 times, then location will contain 200 bytes, but cache cap is 32 + for _ in 0..4 { + buf.extend_from_slice(data); + } + store.inner.put(&location, buf.freeze()).await.unwrap(); + + let _ = store.inner.get_range(&location, 0..16).await.unwrap(); + let _ = store.inner.get_range(&location, 16..32).await.unwrap(); + // cache is full now + assert!(test_file_exists(&store.cache_dir, &location, &(0..16))); + assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); + + // insert new cache, evict oldest entry + let _ = store.inner.get_range(&location, 32..48).await.unwrap(); + assert!(!test_file_exists(&store.cache_dir, &location, &(0..16))); + assert!(test_file_exists(&store.cache_dir, &location, &(32..48))); + + // insert new cache, evict oldest entry + let _ = store.inner.get_range(&location, 48..64).await.unwrap(); + assert!(!test_file_exists(&store.cache_dir, &location, &(16..32))); + assert!(test_file_exists(&store.cache_dir, &location, &(48..64))); + }); } - #[tokio::test] - async fn test_disk_cache_remove_cache_file_two_partition() { - let page_size = 16; - // 51 byte - let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; - let location = Path::from("remove_cache_file_two_partition.sst"); - // partition_cap: 64 / 16 / 2 = 2 - let store = prepare_store(page_size, 64, 1).await; - let mut buf = BytesMut::with_capacity(data.len() * 8); - // extend 8 times - for _ in 0..8 { - buf.extend_from_slice(data); - } - store.inner.put(&location, buf.freeze()).await.unwrap(); - // use seahash - // 0..16: partition 1 - // 16..32 partition 1 - // 32..48 partition 0 - // 48..64 partition 1 - // 64..80 partition 1 - // 80..96 partition 0 - // 96..112 partition 0 - // 112..128 partition 0 - // 128..144 partition 0 - let _ = store.inner.get_range(&location, 0..16).await.unwrap(); - let _ = store.inner.get_range(&location, 16..32).await.unwrap(); - // partition 1 cache is full now - assert!(test_file_exists(&store.cache_dir, &location, &(0..16))); - assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); - - let _ = store.inner.get_range(&location, 32..48).await.unwrap(); - let _ = store.inner.get_range(&location, 80..96).await.unwrap(); - // partition 0 cache is full now - - assert!(test_file_exists(&store.cache_dir, &location, &(32..48))); - assert!(test_file_exists(&store.cache_dir, &location, &(80..96))); - - // insert new entry into partition 0, evict partition 0's oldest entry - let _ = store.inner.get_range(&location, 96..112).await.unwrap(); - assert!(!test_file_exists(&store.cache_dir, &location, &(32..48))); - assert!(test_file_exists(&store.cache_dir, &location, &(80..96))); - - assert!(test_file_exists(&store.cache_dir, &location, &(0..16))); - assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); - - // insert new entry into partition 0, evict partition 0's oldest entry - let _ = store.inner.get_range(&location, 128..144).await.unwrap(); - assert!(!test_file_exists(&store.cache_dir, &location, &(80..96))); - assert!(test_file_exists(&store.cache_dir, &location, &(96..112))); - assert!(test_file_exists(&store.cache_dir, &location, &(128..144))); - - assert!(test_file_exists(&store.cache_dir, &location, &(0..16))); - assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); - - // insert new entry into partition 1, evict partition 1's oldest entry - let _ = store.inner.get_range(&location, 64..80).await.unwrap(); - assert!(!test_file_exists(&store.cache_dir, &location, &(0..16))); - assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); - assert!(test_file_exists(&store.cache_dir, &location, &(64..80))); - - assert!(test_file_exists(&store.cache_dir, &location, &(96..112))); - assert!(test_file_exists(&store.cache_dir, &location, &(128..144))); + #[test] + fn test_disk_cache_remove_cache_file_two_partition() { + let rt = Arc::new(Builder::default().build().unwrap()); + rt.block_on(async { + let page_size = 16; + // 51 byte + let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; + let location = Path::from("remove_cache_file_two_partition.sst"); + // partition_cap: 64 / 16 / 2 = 2 + let store = prepare_store(page_size, 64, 1, rt.clone()).await; + let mut buf = BytesMut::with_capacity(data.len() * 8); + // extend 8 times + for _ in 0..8 { + buf.extend_from_slice(data); + } + store.inner.put(&location, buf.freeze()).await.unwrap(); + // use seahash + // 0..16: partition 1 + // 16..32 partition 1 + // 32..48 partition 0 + // 48..64 partition 1 + // 64..80 partition 1 + // 80..96 partition 0 + // 96..112 partition 0 + // 112..128 partition 0 + // 128..144 partition 0 + let _ = store.inner.get_range(&location, 0..16).await.unwrap(); + let _ = store.inner.get_range(&location, 16..32).await.unwrap(); + // partition 1 cache is full now + assert!(test_file_exists(&store.cache_dir, &location, &(0..16))); + assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); + + let _ = store.inner.get_range(&location, 32..48).await.unwrap(); + let _ = store.inner.get_range(&location, 80..96).await.unwrap(); + // partition 0 cache is full now + + assert!(test_file_exists(&store.cache_dir, &location, &(32..48))); + assert!(test_file_exists(&store.cache_dir, &location, &(80..96))); + + // insert new entry into partition 0, evict partition 0's oldest entry + let _ = store.inner.get_range(&location, 96..112).await.unwrap(); + assert!(!test_file_exists(&store.cache_dir, &location, &(32..48))); + assert!(test_file_exists(&store.cache_dir, &location, &(80..96))); + + assert!(test_file_exists(&store.cache_dir, &location, &(0..16))); + assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); + + // insert new entry into partition 0, evict partition 0's oldest entry + let _ = store.inner.get_range(&location, 128..144).await.unwrap(); + assert!(!test_file_exists(&store.cache_dir, &location, &(80..96))); + assert!(test_file_exists(&store.cache_dir, &location, &(96..112))); + assert!(test_file_exists(&store.cache_dir, &location, &(128..144))); + + assert!(test_file_exists(&store.cache_dir, &location, &(0..16))); + assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); + + // insert new entry into partition 1, evict partition 1's oldest entry + let _ = store.inner.get_range(&location, 64..80).await.unwrap(); + assert!(!test_file_exists(&store.cache_dir, &location, &(0..16))); + assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); + assert!(test_file_exists(&store.cache_dir, &location, &(64..80))); + + assert!(test_file_exists(&store.cache_dir, &location, &(96..112))); + assert!(test_file_exists(&store.cache_dir, &location, &(128..144))); + }); } - #[tokio::test] - async fn test_disk_cache_manifest() { - let cache_dir = tempdir().unwrap(); - let cache_root_dir = cache_dir.as_ref().to_string_lossy().to_string(); - let page_size = 8; - let runtime = Arc::new(Builder::default().build().unwrap()); - let first_create_time = { - let _store = { - let local_path = tempdir().unwrap(); - let local_store = - Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - DiskCacheStore::try_new( - cache_root_dir.clone(), - 160, - 8, - local_store, - 0, - runtime.clone(), - ) - .await - .unwrap() + #[test] + fn test_disk_cache_manifest() { + let rt = Arc::new(Builder::default().build().unwrap()); + rt.block_on(async { + let cache_dir = tempdir().unwrap(); + let cache_root_dir = cache_dir.as_ref().to_string_lossy().to_string(); + let page_size = 8; + let first_create_time = { + let _store = { + let local_path = tempdir().unwrap(); + let local_store = + Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + DiskCacheStore::try_new( + cache_root_dir.clone(), + 160, + 8, + local_store, + 0, + rt.clone(), + ) + .await + .unwrap() + }; + let manifest = + DiskCacheStore::create_manifest_if_not_exists(&cache_root_dir, page_size) + .await + .unwrap(); + + assert_eq!(manifest.page_size, 8); + assert_eq!(manifest.version, Manifest::CURRENT_VERSION); + manifest.create_at }; - let manifest = - DiskCacheStore::create_manifest_if_not_exists(&cache_root_dir, page_size) + + // open again + { + let _store = { + let local_path = tempdir().unwrap(); + let local_store = + Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + DiskCacheStore::try_new( + cache_root_dir.clone(), + 160, + 8, + local_store, + 0, + rt.clone(), + ) .await - .unwrap(); + .unwrap() + }; - assert_eq!(manifest.page_size, 8); - assert_eq!(manifest.version, Manifest::CURRENT_VERSION); - manifest.create_at - }; + let manifest = + DiskCacheStore::create_manifest_if_not_exists(&cache_root_dir, page_size) + .await + .unwrap(); + assert_eq!(manifest.create_at, first_create_time); + assert_eq!(manifest.page_size, 8); + assert_eq!(manifest.version, Manifest::CURRENT_VERSION); + } - // open again - { - let _store = { + // open again, but with different page_size + { let local_path = tempdir().unwrap(); let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - DiskCacheStore::try_new( - cache_root_dir.clone(), + let store = DiskCacheStore::try_new( + cache_dir.as_ref().to_string_lossy().to_string(), 160, - 8, + page_size * 2, local_store, 0, - runtime.clone(), + rt.clone(), ) - .await - .unwrap() - }; - - let manifest = - DiskCacheStore::create_manifest_if_not_exists(&cache_root_dir, page_size) - .await - .unwrap(); - assert_eq!(manifest.create_at, first_create_time); - assert_eq!(manifest.page_size, 8); - assert_eq!(manifest.version, Manifest::CURRENT_VERSION); - } + .await; - // open again, but with different page_size - { - let local_path = tempdir().unwrap(); - let local_store = - Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - let store = DiskCacheStore::try_new( - cache_dir.as_ref().to_string_lossy().to_string(), - 160, - page_size * 2, - local_store, - 0, - runtime, - ) - .await; - - assert!(store.is_err()) - } + assert!(store.is_err()) + } + }); } - #[tokio::test] - async fn test_disk_cache_recovery() { - let cache_dir = tempdir().unwrap(); - let cache_root_dir = cache_dir.as_ref().to_string_lossy().to_string(); - let page_size = 16; - let location = Path::from("recovery.sst"); - let runtime = Arc::new(Builder::default().build().unwrap()); - - { - let store = { - let local_path = tempdir().unwrap(); - let local_store = - Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - DiskCacheStore::try_new( - cache_root_dir.clone(), - 10240, - page_size, - local_store, - 0, - runtime.clone(), - ) - .await - .unwrap() + #[test] + fn test_disk_cache_recovery() { + let rt = Arc::new(Builder::default().build().unwrap()); + rt.block_on(async { + let cache_dir = tempdir().unwrap(); + let cache_root_dir = cache_dir.as_ref().to_string_lossy().to_string(); + let page_size = 16; + let location = Path::from("recovery.sst"); + { + let store = { + let local_path = tempdir().unwrap(); + let local_store = + Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + DiskCacheStore::try_new( + cache_root_dir.clone(), + 10240, + page_size, + local_store, + 0, + rt.clone(), + ) + .await + .unwrap() + }; + let data = b"abcd"; + let mut buf = BytesMut::with_capacity(data.len() * 1024); + for _ in 0..1024 { + buf.extend_from_slice(data); + } + let buf = buf.freeze(); + store.put(&location, buf.clone()).await.unwrap(); + let read_range = 16..100; + let bytes = store + .get_range(&location, read_range.clone()) + .await + .unwrap(); + assert_eq!(bytes.len(), read_range.len()); + assert_eq!(bytes[..], buf[read_range]) }; - let data = b"abcd"; - let mut buf = BytesMut::with_capacity(data.len() * 1024); - for _ in 0..1024 { - buf.extend_from_slice(data); - } - let buf = buf.freeze(); - store.put(&location, buf.clone()).await.unwrap(); - let read_range = 16..100; - let bytes = store - .get_range(&location, read_range.clone()) - .await - .unwrap(); - assert_eq!(bytes.len(), read_range.len()); - assert_eq!(bytes[..], buf[read_range]) - }; - // recover - { - let store = { - let local_path = tempdir().unwrap(); - let local_store = - Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - DiskCacheStore::try_new( - cache_root_dir.clone(), - 160, - page_size, - local_store, - 0, - runtime, - ) - .await - .unwrap() + // recover + { + let store = { + let local_path = tempdir().unwrap(); + let local_store = + Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + DiskCacheStore::try_new( + cache_root_dir.clone(), + 160, + page_size, + local_store, + 0, + rt.clone(), + ) + .await + .unwrap() + }; + for range in [16..32, 32..48, 48..64, 64..80, 80..96, 96..112] { + let filename = DiskCacheStore::page_cache_name(&location, &range); + let cache = store.cache.meta_cache.lock(&filename); + assert!(cache.contains(&filename)); + assert!(test_file_exists(&cache_dir, &location, &range)); + } }; - for range in [16..32, 32..48, 48..64, 64..80, 80..96, 96..112] { - let filename = DiskCacheStore::page_cache_name(&location, &range); - let cache = store.cache.meta_cache.lock(&filename); - assert!(cache.contains(&filename)); - assert!(test_file_exists(&cache_dir, &location, &range)); - } - }; + }); } #[test] @@ -1429,18 +1453,21 @@ mod test { } } - #[tokio::test] - async fn test_corrupt_disk_cache() { - for page_size in [1, 2, 4, 8, 16, 32, 64, 128] { - corrupt_disk_cache(page_size).await; - } + #[test] + fn test_corrupt_disk_cache() { + let rt = Arc::new(Builder::default().build().unwrap()); + rt.block_on(async { + for page_size in [1, 2, 4, 8, 16, 32, 64, 128] { + corrupt_disk_cache(page_size, rt.clone()).await; + } + }); } - async fn corrupt_disk_cache(page_size: usize) { + async fn corrupt_disk_cache(page_size: usize, rt: RuntimeRef) { let StoreWithCacheDir { inner: store, cache_dir, - } = prepare_store(page_size, 1024, 0).await; + } = prepare_store(page_size, 1024, 0, rt).await; let test_file_name = "corrupted_disk_cache_file"; let test_file_path = Path::from(test_file_name); let test_file_bytes = Bytes::from("corrupted_disk_cache_file_data");