-
Notifications
You must be signed in to change notification settings - Fork 214
/
Copy pathdedup_requests.rs
123 lines (108 loc) · 3.34 KB
/
dedup_requests.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{collections::HashMap, hash::Hash, sync::RwLock};
#[derive(Debug)]
struct Notifiers<T> {
notifiers: RwLock<Vec<T>>,
}
impl<T> Notifiers<T> {
pub fn new(notifier: T) -> Self {
let notifiers = vec![notifier];
Self {
notifiers: RwLock::new(notifiers),
}
}
pub fn add_notifier(&self, notifier: T) {
self.notifiers.write().unwrap().push(notifier);
}
}
#[derive(Debug)]
pub struct RequestNotifiers<K, T>
where
K: PartialEq + Eq + Hash,
{
notifiers_by_key: RwLock<HashMap<K, Notifiers<T>>>,
}
impl<K, T> Default for RequestNotifiers<K, T>
where
K: PartialEq + Eq + Hash,
{
fn default() -> Self {
Self {
notifiers_by_key: RwLock::new(HashMap::new()),
}
}
}
impl<K, T> RequestNotifiers<K, T>
where
K: PartialEq + Eq + Hash,
{
/// Insert a notifier for the given key.
pub fn insert_notifier(&self, key: K, notifier: T) -> RequestResult {
// First try to read the notifiers, if the key exists, add the notifier to the
// notifiers.
let notifiers_by_key = self.notifiers_by_key.read().unwrap();
if let Some(notifiers) = notifiers_by_key.get(&key) {
notifiers.add_notifier(notifier);
return RequestResult::Wait;
}
drop(notifiers_by_key);
// If the key does not exist, try to write the notifiers.
let mut notifiers_by_key = self.notifiers_by_key.write().unwrap();
// double check, if the key exists, add the notifier to the notifiers.
if let Some(notifiers) = notifiers_by_key.get(&key) {
notifiers.add_notifier(notifier);
return RequestResult::Wait;
}
//the key is not existed, insert the key and the notifier.
notifiers_by_key.insert(key, Notifiers::new(notifier));
RequestResult::First
}
/// Take the notifiers for the given key, and remove the key from the map.
pub fn take_notifiers(&self, key: &K) -> Option<Vec<T>> {
self.notifiers_by_key
.write()
.unwrap()
.remove(key)
.map(|notifiers| notifiers.notifiers.into_inner().unwrap())
}
}
pub enum RequestResult {
// The first request for this key, need to handle this request.
First,
// There are other requests for this key, just wait for the result.
Wait,
}
pub struct ExecutionGuard<F: FnMut()> {
f: F,
cancelled: bool,
}
impl<F: FnMut()> ExecutionGuard<F> {
pub fn new(f: F) -> Self {
Self {
f,
cancelled: false,
}
}
pub fn cancel(&mut self) {
self.cancelled = true;
}
}
impl<F: FnMut()> Drop for ExecutionGuard<F> {
fn drop(&mut self) {
if !self.cancelled {
(self.f)()
}
}
}