Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add partition num as param in partition lock's init_fn #981

Merged
merged 9 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 35 additions & 30 deletions common_util/src/partitioned_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@ impl<T, B> PartitionedRwLock<T, B>
where
B: BuildHasher,
{
pub fn new<F>(init_fn: F, partition_bit: usize, hash_builder: B) -> Self
pub fn try_new<F, E>(init_fn: F, partition_bit: usize, hash_builder: B) -> Result<Self, E>
where
F: Fn() -> T,
F: Fn(usize) -> Result<T, E>,
{
let partition_num = 1 << partition_bit;
let partitions = (1..partition_num)
.map(|_| RwLock::new(init_fn()))
.collect::<Vec<RwLock<T>>>();
Self {
.map(|_| init_fn(partition_num).map(RwLock::new))
.collect::<Result<Vec<RwLock<T>>, E>>()?;

Ok(Self {
partitions,
partition_mask: partition_num - 1,
hash_builder,
}
})
}

pub fn read<K: Eq + Hash>(&self, key: &K) -> RwLockReadGuard<'_, T> {
Expand Down Expand Up @@ -79,19 +80,20 @@ impl<T, B> PartitionedMutex<T, B>
where
B: BuildHasher,
{
pub fn new<F>(init_fn: F, partition_bit: usize, hash_builder: B) -> Self
pub fn try_new<F, E>(init_fn: F, partition_bit: usize, hash_builder: B) -> Result<Self, E>
where
F: Fn() -> T,
F: Fn(usize) -> Result<T, E>,
{
let partition_num = 1 << partition_bit;
let partitions = (0..partition_num)
.map(|_| Mutex::new(init_fn()))
.collect::<Vec<Mutex<T>>>();
Self {
.map(|_| init_fn(partition_num).map(Mutex::new))
.collect::<Result<Vec<Mutex<T>>, E>>()?;

Ok(Self {
partitions,
partition_mask: partition_num - 1,
hash_builder,
}
})
}

pub fn lock<K: Eq + Hash>(&self, key: &K) -> MutexGuard<'_, T> {
Expand Down Expand Up @@ -131,19 +133,20 @@ impl<T, B> PartitionedMutexAsync<T, B>
where
B: BuildHasher,
{
pub fn new<F>(init_fn: F, partition_bit: usize, hash_builder: B) -> Self
pub fn try_new<F, E>(init_fn: F, partition_bit: usize, hash_builder: B) -> Result<Self, E>
where
F: Fn() -> T,
F: Fn(usize) -> Result<T, E>,
{
let partition_num = 1 << partition_bit;
let partitions = (0..partition_num)
.map(|_| tokio::sync::Mutex::new(init_fn()))
.collect::<Vec<tokio::sync::Mutex<T>>>();
Self {
.map(|_| init_fn(partition_num).map(tokio::sync::Mutex::new))
.collect::<Result<Vec<tokio::sync::Mutex<T>>, E>>()?;

Ok(Self {
partitions,
partition_mask: partition_num - 1,
hash_builder,
}
})
}

pub async fn lock<K: Eq + Hash>(&self, key: &K) -> tokio::sync::MutexGuard<'_, T> {
Expand Down Expand Up @@ -174,9 +177,9 @@ mod tests {

#[test]
fn test_partitioned_rwlock() {
let init_hmap = HashMap::new;
let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new());
let test_locked_map =
PartitionedRwLock::new(init_hmap, 4, build_fixed_seed_ahasher_builder());
PartitionedRwLock::try_new(init_hmap, 4, build_fixed_seed_ahasher_builder()).unwrap();
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();

Expand All @@ -193,9 +196,9 @@ mod tests {

#[test]
fn test_partitioned_mutex() {
let init_hmap = HashMap::new;
let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new());
let test_locked_map =
PartitionedMutex::new(init_hmap, 4, build_fixed_seed_ahasher_builder());
PartitionedMutex::try_new(init_hmap, 4, build_fixed_seed_ahasher_builder()).unwrap();
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();

Expand All @@ -212,8 +215,9 @@ mod tests {

#[tokio::test]
async fn test_partitioned_mutex_async() {
let init_hmap = HashMap::new;
let test_locked_map = PartitionedMutexAsync::new(init_hmap, 4, SeaHasherBuilder);
let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new());
let test_locked_map =
PartitionedMutexAsync::try_new(init_hmap, 4, SeaHasherBuilder).unwrap();
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();

Expand All @@ -230,9 +234,9 @@ mod tests {

#[test]
fn test_partitioned_mutex_vis_different_partition() {
let init_vec = Vec::<i32>::new;
let init_vec = |_: usize| Ok::<_, ()>(Vec::<i32>::new());
let test_locked_map =
PartitionedMutex::new(init_vec, 4, build_fixed_seed_ahasher_builder());
PartitionedMutex::try_new(init_vec, 4, build_fixed_seed_ahasher_builder()).unwrap();
let mutex_first = test_locked_map.get_partition_by_index(0);

let mut _tmp_data = mutex_first.lock().unwrap();
Expand All @@ -245,9 +249,9 @@ mod tests {

#[test]
fn test_partitioned_rwmutex_vis_different_partition() {
let init_vec = Vec::<i32>::new;
let init_vec = |_: usize| Ok::<_, ()>(Vec::<i32>::new());
let test_locked_map =
PartitionedRwLock::new(init_vec, 4, build_fixed_seed_ahasher_builder());
PartitionedRwLock::try_new(init_vec, 4, build_fixed_seed_ahasher_builder()).unwrap();
let mutex_first = test_locked_map.get_partition_by_index(0);
let mut _tmp = mutex_first.write().unwrap();
assert!(mutex_first.try_write().is_err());
Expand All @@ -259,8 +263,9 @@ mod tests {

#[tokio::test]
async fn test_partitioned_mutex_async_vis_different_partition() {
let init_vec = Vec::<i32>::new;
let test_locked_map = PartitionedMutexAsync::new(init_vec, 4, SeaHasherBuilder);
let init_vec = |_: usize| Ok::<_, ()>(Vec::<i32>::new());
let test_locked_map =
PartitionedMutexAsync::try_new(init_vec, 4, SeaHasherBuilder).unwrap();
let mutex_first = test_locked_map.get_partition_by_index(0).await;

let mut _tmp_data = mutex_first.lock().await;
Expand Down
27 changes: 14 additions & 13 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,18 @@ struct DiskCache {
}

impl DiskCache {
fn new(root_dir: String, cap: usize, partition_bits: usize) -> Self {
let init_lru = || LruCache::new(cap);
Self {
fn try_new(root_dir: String, cap: usize, partition_bits: usize) -> Result<Self> {
let init_lru = |partition_num: usize| -> Result<_> {
let cap_per_par = cap / partition_num;
ensure!(cap_per_par != 0, InvalidCapacity);
Ok(LruCache::new(cap / partition_num))
};

Ok(Self {
root_dir,
cap,
cache: PartitionedMutexAsync::new(init_lru, partition_bits, SeaHasherBuilder {}),
}
cap: cap / (1 << partition_bits),
cache: PartitionedMutexAsync::try_new(init_lru, partition_bits, SeaHasherBuilder {})?,
})
}

/// Update the cache.
Expand Down Expand Up @@ -292,14 +297,10 @@ impl DiskCacheStore {
underlying_store: Arc<dyn ObjectStore>,
partition_bits: usize,
) -> Result<Self> {
ensure!(
cap % (page_size * (1 << partition_bits)) == 0,
InvalidCapacity
);
let cap_per_part = cap / page_size / (1 << partition_bits);
ensure!(cap_per_part != 0, InvalidCapacity);
let pagenum_per_part = cap / page_size;
ensure!(pagenum_per_part != 0, InvalidCapacity);
let _ = Self::create_manifest_if_not_exists(&cache_dir, page_size).await?;
let cache = DiskCache::new(cache_dir.clone(), cap_per_part, partition_bits);
let cache = DiskCache::try_new(cache_dir.clone(), pagenum_per_part, partition_bits)?;
Self::recover_cache(&cache_dir, &cache).await?;

let size_cache = Arc::new(Mutex::new(LruCache::new(cap / page_size)));
Expand Down
76 changes: 48 additions & 28 deletions components/object_store/src/mem_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ use async_trait::async_trait;
use bytes::Bytes;
use clru::{CLruCache, CLruCacheConfig, WeightScale};
use common_types::hash::{ahash::RandomState, build_fixed_seed_ahasher_builder};
use common_util::partitioned_lock::PartitionedMutex;
use common_util::{define_result, partitioned_lock::PartitionedMutex};
use futures::stream::BoxStream;
use snafu::{OptionExt, Snafu};
use tokio::io::AsyncWrite;
use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};
use upstream::{
path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
Result as ObjectStoreResult,
};

use crate::ObjectStoreRef;

Expand All @@ -29,6 +32,8 @@ pub enum Error {
InvalidCapacity,
}

define_result!(Error);

struct CustomScale;

impl WeightScale<String, Bytes> for CustomScale {
Expand All @@ -46,23 +51,23 @@ pub struct MemCache {
pub type MemCacheRef = Arc<MemCache>;

impl MemCache {
pub fn try_new(
partition_bits: usize,
mem_cap: NonZeroUsize,
) -> std::result::Result<Self, Error> {
let partition_num = 1 << partition_bits;
let cap_per_part =
NonZeroUsize::new((mem_cap.get() as f64 / partition_num as f64) as usize)
.context(InvalidCapacity)?;
let init_lru = || {
CLruCache::with_config(
pub fn try_new(partition_bits: usize, mem_cap: NonZeroUsize) -> Result<Self> {
let init_lru = |partition_num: usize| -> Result<_> {
let cap_per_part =
NonZeroUsize::new(mem_cap.get() / partition_num).context(InvalidCapacity)?;
Ok(CLruCache::with_config(
CLruCacheConfig::new(cap_per_part)
.with_hasher(build_fixed_seed_ahasher_builder())
.with_scale(CustomScale),
)
))
};
let inner =
PartitionedMutex::new(init_lru, partition_bits, build_fixed_seed_ahasher_builder());

let inner = PartitionedMutex::try_new(
init_lru,
partition_bits,
build_fixed_seed_ahasher_builder(),
)?;

Ok(Self { mem_cap, inner })
}

Expand Down Expand Up @@ -143,7 +148,11 @@ impl MemCacheStore {
format!("{}-{}-{}", location, range.start, range.end)
}

async fn get_range_with_rw_cache(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range_with_rw_cache(
&self,
location: &Path,
range: Range<usize>,
) -> ObjectStoreResult<Bytes> {
// TODO(chenxiang): What if there are some overlapping range in cache?
// A request with range [5, 10) can also use [0, 20) cache
let cache_key = Self::cache_key(location, &range);
Expand All @@ -159,7 +168,11 @@ impl MemCacheStore {
Ok(bytes)
}

async fn get_range_with_ro_cache(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range_with_ro_cache(
&self,
location: &Path,
range: Range<usize>,
) -> ObjectStoreResult<Bytes> {
let cache_key = Self::cache_key(location, &range);
if let Some(bytes) = self.cache.peek(&cache_key) {
return Ok(bytes);
Expand All @@ -185,18 +198,22 @@ impl fmt::Debug for MemCacheStore {

#[async_trait]
impl ObjectStore for MemCacheStore {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<()> {
self.underlying_store.put(location, bytes).await
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
) -> ObjectStoreResult<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
self.underlying_store.put_multipart(location).await
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> ObjectStoreResult<()> {
self.underlying_store
.abort_multipart(location, multipart_id)
.await
Expand All @@ -205,39 +222,42 @@ impl ObjectStore for MemCacheStore {
// TODO(chenxiang): don't cache whole path for reasons below
// 1. cache key don't support overlapping
// 2. In sst module, we only use get_range, get is not used
async fn get(&self, location: &Path) -> Result<GetResult> {
async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
self.underlying_store.get(location).await
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<usize>) -> ObjectStoreResult<Bytes> {
if self.readonly_cache {
self.get_range_with_ro_cache(location, range).await
} else {
self.get_range_with_rw_cache(location, range).await
}
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
self.underlying_store.head(location).await
}

async fn delete(&self, location: &Path) -> Result<()> {
async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
self.underlying_store.delete(location).await
}

async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
async fn list(
&self,
prefix: Option<&Path>,
) -> ObjectStoreResult<BoxStream<'_, ObjectStoreResult<ObjectMeta>>> {
self.underlying_store.list(prefix).await
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
self.underlying_store.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
self.underlying_store.copy(from, to).await
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
self.underlying_store.copy_if_not_exists(from, to).await
}
}
Expand Down