Skip to content

Commit 476bc82

Browse files
authored
refactor: replace tokio lock with std lock in some sync scenarios (apache#694)
* refactor: replace tokio lock with std lock in some sync scenarios * fix: clippy warnings
1 parent cff7c77 commit 476bc82

File tree

4 files changed

+54
-69
lines changed

4 files changed

+54
-69
lines changed

common_util/src/partitioned_lock.rs

+15-17
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,9 @@ use std::{
66
collections::hash_map::DefaultHasher,
77
hash::{Hash, Hasher},
88
num::NonZeroUsize,
9-
sync::Arc,
9+
sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard},
1010
};
1111

12-
use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
13-
1412
/// Simple partitioned `RwLock`
1513
pub struct PartitionedRwLock<T> {
1614
partitions: Vec<Arc<RwLock<T>>>,
@@ -28,16 +26,16 @@ impl<T> PartitionedRwLock<T> {
2826
}
2927
}
3028

31-
pub async fn read<K: Eq + Hash>(&self, key: &K) -> RwLockReadGuard<'_, T> {
29+
pub fn read<K: Eq + Hash>(&self, key: &K) -> RwLockReadGuard<'_, T> {
3230
let rwlock = self.get_partition(key);
3331

34-
rwlock.read().await
32+
rwlock.read().unwrap()
3533
}
3634

37-
pub async fn write<K: Eq + Hash>(&self, key: &K) -> RwLockWriteGuard<'_, T> {
35+
pub fn write<K: Eq + Hash>(&self, key: &K) -> RwLockWriteGuard<'_, T> {
3836
let rwlock = self.get_partition(key);
3937

40-
rwlock.write().await
38+
rwlock.write().unwrap()
4139
}
4240

4341
fn get_partition<K: Eq + Hash>(&self, key: &K) -> &RwLock<T> {
@@ -66,10 +64,10 @@ impl<T> PartitionedMutex<T> {
6664
}
6765
}
6866

69-
pub async fn lock<K: Eq + Hash>(&self, key: &K) -> MutexGuard<'_, T> {
67+
pub fn lock<K: Eq + Hash>(&self, key: &K) -> MutexGuard<'_, T> {
7068
let mutex = self.get_partition(key);
7169

72-
mutex.lock().await
70+
mutex.lock().unwrap()
7371
}
7472

7573
fn get_partition<K: Eq + Hash>(&self, key: &K) -> &Mutex<T> {
@@ -87,37 +85,37 @@ mod tests {
8785

8886
use super::*;
8987

90-
#[tokio::test]
91-
async fn test_partitioned_rwlock() {
88+
#[test]
89+
fn test_partitioned_rwlock() {
9290
let test_locked_map =
9391
PartitionedRwLock::new(HashMap::new(), NonZeroUsize::new(10).unwrap());
9492
let test_key = "test_key".to_string();
9593
let test_value = "test_value".to_string();
9694

9795
{
98-
let mut map = test_locked_map.write(&test_key).await;
96+
let mut map = test_locked_map.write(&test_key);
9997
map.insert(test_key.clone(), test_value.clone());
10098
}
10199

102100
{
103-
let map = test_locked_map.read(&test_key).await;
101+
let map = test_locked_map.read(&test_key);
104102
assert_eq!(map.get(&test_key).unwrap(), &test_value);
105103
}
106104
}
107105

108-
#[tokio::test]
109-
async fn test_partitioned_mutex() {
106+
#[test]
107+
fn test_partitioned_mutex() {
110108
let test_locked_map = PartitionedMutex::new(HashMap::new(), NonZeroUsize::new(10).unwrap());
111109
let test_key = "test_key".to_string();
112110
let test_value = "test_value".to_string();
113111

114112
{
115-
let mut map = test_locked_map.lock(&test_key).await;
113+
let mut map = test_locked_map.lock(&test_key);
116114
map.insert(test_key.clone(), test_value.clone());
117115
}
118116

119117
{
120-
let map = test_locked_map.lock(&test_key).await;
118+
let map = test_locked_map.lock(&test_key);
121119
assert_eq!(map.get(&test_key).unwrap(), &test_value);
122120
}
123121
}

components/object_store/src/mem_cache.rs

+32-43
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ use std::{
1010
hash::{Hash, Hasher},
1111
num::NonZeroUsize,
1212
ops::Range,
13-
sync::Arc,
13+
sync::{Arc, Mutex},
1414
};
1515

1616
use async_trait::async_trait;
1717
use bytes::Bytes;
1818
use clru::{CLruCache, CLruCacheConfig, WeightScale};
1919
use futures::stream::BoxStream;
2020
use snafu::{OptionExt, Snafu};
21-
use tokio::{io::AsyncWrite, sync::Mutex};
21+
use tokio::io::AsyncWrite;
2222
use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};
2323

2424
use crate::ObjectStoreRef;
@@ -52,26 +52,26 @@ impl Partition {
5252
}
5353

5454
impl Partition {
55-
async fn get(&self, key: &str) -> Option<Bytes> {
56-
let mut guard = self.inner.lock().await;
55+
fn get(&self, key: &str) -> Option<Bytes> {
56+
let mut guard = self.inner.lock().unwrap();
5757
guard.get(key).cloned()
5858
}
5959

60-
async fn peek(&self, key: &str) -> Option<Bytes> {
60+
fn peek(&self, key: &str) -> Option<Bytes> {
6161
// FIXME: actually, here write lock is not necessary.
62-
let guard = self.inner.lock().await;
62+
let guard = self.inner.lock().unwrap();
6363
guard.peek(key).cloned()
6464
}
6565

66-
async fn insert(&self, key: String, value: Bytes) {
67-
let mut guard = self.inner.lock().await;
66+
fn insert(&self, key: String, value: Bytes) {
67+
let mut guard = self.inner.lock().unwrap();
6868
// don't care error now.
6969
_ = guard.put_with_weight(key, value);
7070
}
7171

7272
#[cfg(test)]
73-
async fn keys(&self) -> Vec<String> {
74-
let guard = self.inner.lock().await;
73+
fn keys(&self) -> Vec<String> {
74+
let guard = self.inner.lock().unwrap();
7575
guard
7676
.iter()
7777
.map(|(key, _)| key)
@@ -115,34 +115,31 @@ impl MemCache {
115115
self.partitions[hasher.finish() as usize & self.partition_mask].clone()
116116
}
117117

118-
async fn get(&self, key: &str) -> Option<Bytes> {
118+
fn get(&self, key: &str) -> Option<Bytes> {
119119
let partition = self.locate_partition(key);
120-
partition.get(key).await
120+
partition.get(key)
121121
}
122122

123-
async fn peek(&self, key: &str) -> Option<Bytes> {
123+
fn peek(&self, key: &str) -> Option<Bytes> {
124124
let partition = self.locate_partition(key);
125-
partition.peek(key).await
125+
partition.peek(key)
126126
}
127127

128-
async fn insert(&self, key: String, value: Bytes) {
128+
fn insert(&self, key: String, value: Bytes) {
129129
let partition = self.locate_partition(&key);
130-
partition.insert(key, value).await;
130+
partition.insert(key, value);
131131
}
132132

133+
/// Give a description of the cache state.
133134
#[cfg(test)]
134-
async fn to_string(&self) -> String {
135-
futures::future::join_all(
136-
self.partitions
137-
.iter()
138-
.map(|part| async { part.keys().await.join(",") }),
139-
)
140-
.await
141-
.into_iter()
142-
.enumerate()
143-
.map(|(part_no, keys)| format!("{part_no}: [{keys}]"))
144-
.collect::<Vec<_>>()
145-
.join("\n")
135+
fn state_desc(&self) -> String {
136+
self.partitions
137+
.iter()
138+
.map(|part| part.keys().join(","))
139+
.enumerate()
140+
.map(|(part_no, keys)| format!("{part_no}: [{keys}]"))
141+
.collect::<Vec<_>>()
142+
.join("\n")
146143
}
147144
}
148145

@@ -195,21 +192,21 @@ impl MemCacheStore {
195192
// TODO(chenxiang): What if there are some overlapping range in cache?
196193
// A request with range [5, 10) can also use [0, 20) cache
197194
let cache_key = Self::cache_key(location, &range);
198-
if let Some(bytes) = self.cache.get(&cache_key).await {
195+
if let Some(bytes) = self.cache.get(&cache_key) {
199196
return Ok(bytes);
200197
}
201198

202199
// TODO(chenxiang): What if two threads reach here? It's better to
203200
// pend one thread, and only let one to fetch data from underlying store.
204201
let bytes = self.underlying_store.get_range(location, range).await?;
205-
self.cache.insert(cache_key, bytes.clone()).await;
202+
self.cache.insert(cache_key, bytes.clone());
206203

207204
Ok(bytes)
208205
}
209206

210207
async fn get_range_with_ro_cache(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
211208
let cache_key = Self::cache_key(location, &range);
212-
if let Some(bytes) = self.cache.peek(&cache_key).await {
209+
if let Some(bytes) = self.cache.peek(&cache_key) {
213210
return Ok(bytes);
214211
}
215212

@@ -297,7 +294,7 @@ mod test {
297294

298295
use super::*;
299296

300-
async fn prepare_store(bits: usize, mem_cap: usize) -> MemCacheStore {
297+
fn prepare_store(bits: usize, mem_cap: usize) -> MemCacheStore {
301298
let local_path = tempdir().unwrap();
302299
let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
303300

@@ -309,7 +306,7 @@ mod test {
309306
#[tokio::test]
310307
async fn test_mem_cache_evict() {
311308
// single partition
312-
let store = prepare_store(0, 13).await;
309+
let store = prepare_store(0, 13);
313310

314311
// write date
315312
let location = Path::from("1.sst");
@@ -324,7 +321,6 @@ mod test {
324321
assert!(store
325322
.cache
326323
.get(&MemCacheStore::cache_key(&location, &range0_5))
327-
.await
328324
.is_some());
329325

330326
// get bytes from [5, 10), insert to cache
@@ -333,12 +329,10 @@ mod test {
333329
assert!(store
334330
.cache
335331
.get(&MemCacheStore::cache_key(&location, &range0_5))
336-
.await
337332
.is_some());
338333
assert!(store
339334
.cache
340335
.get(&MemCacheStore::cache_key(&location, &range5_10))
341-
.await
342336
.is_some());
343337

344338
// get bytes from [10, 15), insert to cache
@@ -351,24 +345,21 @@ mod test {
351345
assert!(store
352346
.cache
353347
.get(&MemCacheStore::cache_key(&location, &range0_5))
354-
.await
355348
.is_none());
356349
assert!(store
357350
.cache
358351
.get(&MemCacheStore::cache_key(&location, &range5_10))
359-
.await
360352
.is_some());
361353
assert!(store
362354
.cache
363355
.get(&MemCacheStore::cache_key(&location, &range10_15))
364-
.await
365356
.is_some());
366357
}
367358

368359
#[tokio::test]
369360
async fn test_mem_cache_partition() {
370361
// 4 partitions
371-
let store = prepare_store(2, 100).await;
362+
let store = prepare_store(2, 100);
372363
let location = Path::from("partition.sst");
373364
store
374365
.put(&location, Bytes::from_static(&[1; 1024]))
@@ -388,18 +379,16 @@ mod test {
388379
1: [partition.sst-100-105]
389380
2: []
390381
3: [partition.sst-0-5]"#,
391-
store.cache.as_ref().to_string().await
382+
store.cache.as_ref().state_desc()
392383
);
393384

394385
assert!(store
395386
.cache
396387
.get(&MemCacheStore::cache_key(&location, &range0_5))
397-
.await
398388
.is_some());
399389
assert!(store
400390
.cache
401391
.get(&MemCacheStore::cache_key(&location, &range100_105))
402-
.await
403392
.is_some());
404393
}
405394
}

remote_engine_client/src/cached_router.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@
22

33
//! Cached router
44
5-
use std::collections::HashMap;
5+
use std::{collections::HashMap, sync::RwLock};
66

77
use ceresdbproto::storage::{self, RequestContext};
88
use log::debug;
99
use router::RouterRef;
1010
use snafu::{OptionExt, ResultExt};
1111
use table_engine::remote::model::TableIdentifier;
12-
use tokio::sync::RwLock;
1312
use tonic::transport::Channel;
1413

1514
use crate::{channel::ChannelPool, config::Config, error::*};
@@ -40,7 +39,7 @@ impl CachedRouter {
4039
pub async fn route(&self, table_ident: &TableIdentifier) -> Result<Channel> {
4140
// Find in cache first.
4241
let channel_opt = {
43-
let cache = self.cache.read().await;
42+
let cache = self.cache.read().unwrap();
4443
cache.get(table_ident).cloned()
4544
};
4645

@@ -62,7 +61,7 @@ impl CachedRouter {
6261
let channel = self.do_route(table_ident).await?;
6362

6463
{
65-
let mut cache = self.cache.write().await;
64+
let mut cache = self.cache.write().unwrap();
6665
// Double check here, if still not found, we put it.
6766
let channel_opt = cache.get(table_ident).cloned();
6867
if channel_opt.is_none() {
@@ -81,7 +80,7 @@ impl CachedRouter {
8180
}
8281

8382
pub async fn evict(&self, table_ident: &TableIdentifier) {
84-
let mut cache = self.cache.write().await;
83+
let mut cache = self.cache.write().unwrap();
8584
let _ = cache.remove(table_ident);
8685
}
8786

remote_engine_client/src/channel.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33
//! Channel pool
44
5-
use std::collections::HashMap;
5+
use std::{collections::HashMap, sync::RwLock};
66

77
use router::endpoint::Endpoint;
88
use snafu::ResultExt;
9-
use tokio::sync::RwLock;
109
use tonic::transport::{Channel, Endpoint as TonicEndpoint};
1110

1211
use crate::{config::Config, error::*};
@@ -30,7 +29,7 @@ impl ChannelPool {
3029

3130
pub async fn get(&self, endpoint: &Endpoint) -> Result<Channel> {
3231
{
33-
let inner = self.channels.read().await;
32+
let inner = self.channels.read().unwrap();
3433
if let Some(channel) = inner.get(endpoint) {
3534
return Ok(channel.clone());
3635
}
@@ -40,7 +39,7 @@ impl ChannelPool {
4039
.builder
4140
.build(endpoint.clone().to_string().as_str())
4241
.await?;
43-
let mut inner = self.channels.write().await;
42+
let mut inner = self.channels.write().unwrap();
4443
// Double check here.
4544
if let Some(channel) = inner.get(endpoint) {
4645
return Ok(channel.clone());

0 commit comments

Comments
 (0)