Skip to content

Commit

Permalink
refactor: read prev_kv before notify
Browse files Browse the repository at this point in the history
  • Loading branch information
themanforfree authored and mergify[bot] committed Jun 5, 2023
1 parent f093fe6 commit 6c768da
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 51 deletions.
10 changes: 10 additions & 0 deletions xline/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,16 @@ impl RequestWithToken {
}
}

impl Event {
pub(crate) fn is_create(&self) -> bool {
let kv = self
.kv
.as_ref()
.unwrap_or_else(|| panic!("kv must be Some"));
matches!(self.r#type(), EventType::Put) && kv.create_revision == kv.mod_revision
}
}

impl TxnRequest {
/// Checks whether a given `TxnRequest` is read-only or not.
fn is_read_only(&self) -> bool {
Expand Down
128 changes: 119 additions & 9 deletions xline/src/server/watch_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ where
stop_notify: Arc<Event>,
/// Header Generator
header_gen: Arc<HeaderGenerator>,
/// Previous KV status
prev_kv: HashSet<WatchId>,
}

impl<W> WatchHandle<W>
Expand All @@ -146,6 +148,7 @@ where
next_id_gen,
stop_notify,
header_gen,
prev_kv: HashSet::new(),
}
}

Expand Down Expand Up @@ -188,9 +191,15 @@ where
Arc::clone(&self.stop_notify),
self.event_tx.clone(),
);
if req.prev_kv {
assert!(
self.prev_kv.insert(watch_id),
"WatchId {watch_id} already exists in prev_kv",
);
}
assert!(
self.active_watch_ids.insert(watch_id),
"WatchId {watch_id} already exists in watcher_map",
"WatchId {watch_id} already exists in active_watch_ids",
);

let response = WatchResponse {
Expand Down Expand Up @@ -246,15 +255,26 @@ where
}

/// Handle watch event
async fn handle_watch_event(&mut self, mut event: WatchEvent) {
let watch_id = event.watch_id();
let events = event.take_events();
async fn handle_watch_event(&mut self, mut watch_event: WatchEvent) {
let mut events = watch_event.take_events();
if events.is_empty() {
return;
}
let watch_id = watch_event.watch_id();
if self.prev_kv.contains(&watch_id) {
for ev in &mut events {
if !ev.is_create() {
let kv = ev
.kv
.as_ref()
.unwrap_or_else(|| panic!("event.kv can't be None"));
ev.prev_kv = self.kv_watcher.get_prev_kv(kv);
}
}
}
let response = WatchResponse {
header: Some(ResponseHeader {
revision: event.revision(),
revision: watch_event.revision(),
..ResponseHeader::default()
}),
watch_id,
Expand Down Expand Up @@ -311,13 +331,20 @@ where

#[cfg(test)]
mod test {

use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};

use parking_lot::Mutex;
use tokio::time::sleep;
use utils::config::StorageConfig;

use super::*;
use crate::storage::{db::DB, kvwatcher::MockKvWatcherOps};
use crate::{
rpc::{PutRequest, RequestWithToken},
storage::{
db::DB, index::Index, kvwatcher::MockKvWatcherOps, lease_store::LeaseCollection,
KvStore,
},
};

#[tokio::test]
async fn test_watch_client_closes_connection() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -406,12 +433,95 @@ mod test {
};
req_tx1.send(Ok(w_req.clone())).await?;
req_tx2.send(Ok(w_req.clone())).await?;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
sleep(Duration::from_secs(1)).await;
for count in collection.lock().values() {
assert_eq!(*count, 1);
}
handle1.abort();
handle2.abort();
Ok(())
}

#[tokio::test]
async fn test_watch_prev_kv() {
let index = Arc::new(Index::new());
let db = DB::open(&StorageConfig::Memory).unwrap();
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
let lease_collection = Arc::new(LeaseCollection::new(0));
let next_id_gen = Arc::new(WatchIdGenerator::new(1));
let (kv_update_tx, kv_update_rx) = mpsc::channel(CHANNEL_SIZE);
let kv_store = Arc::new(KvStore::new(
kv_update_tx,
lease_collection,
Arc::clone(&header_gen),
Arc::clone(&db),
index,
));
let shutdown_trigger = Arc::new(event_listener::Event::new());
let kv_watcher = KvWatcher::new_arc(
Arc::clone(&kv_store),
kv_update_rx,
shutdown_trigger,
Duration::from_millis(10),
);
put(&kv_store, &db, "foo", "old_bar", 2).await;
put(&kv_store, &db, "foo", "bar", 3).await;

let (req_tx, req_rx) = mpsc::channel(CHANNEL_SIZE);
let req_stream = ReceiverStream::new(req_rx);
let create_watch_req = move |watch_id: WatchId, prev_kv: bool| WatchRequest {
request_union: Some(RequestUnion::CreateRequest(WatchCreateRequest {
watch_id,
key: "foo".into(),
start_revision: 3,
prev_kv,
..Default::default()
})),
};
req_tx.send(Ok(create_watch_req(1, false))).await.unwrap();
req_tx.send(Ok(create_watch_req(2, true))).await.unwrap();
let (res_tx, mut res_rx) = mpsc::channel(CHANNEL_SIZE);
let _hd = tokio::spawn(WatchServer::<DB>::task(
Arc::clone(&next_id_gen),
Arc::clone(&kv_watcher),
res_tx,
req_stream,
Arc::clone(&header_gen),
));

for _ in 0..4 {
let watch_res = res_rx.recv().await.unwrap().unwrap();
if watch_res.events.is_empty() {
// WatchCreateResponse
continue;
}
let watch_id = watch_res.watch_id;
let has_prev = watch_res.events.first().unwrap().prev_kv.is_some();
if watch_id == 1 {
assert!(!has_prev);
} else {
assert!(has_prev);
}
}
}

async fn put(
store: &KvStore<DB>,
db: &DB,
key: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
revision: i64,
) {
let req = RequestWithToken::new(
PutRequest {
key: key.into(),
value: value.into(),
..Default::default()
}
.into(),
);
let (sync_res, ops) = store.after_sync(&req, revision).await.unwrap();
db.flush_ops(ops).unwrap();
store.mark_index_available(sync_res.revision());
}
}
7 changes: 7 additions & 0 deletions xline/src/storage/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,13 @@ where
Ok((kvs, total))
}

/// Get previous `KeyValue` of a `KeyValue`
pub(crate) fn get_prev_kv(&self, kv: &KeyValue) -> Option<KeyValue> {
self.get_range(&kv.key, &[], kv.mod_revision.overflow_sub(1))
.ok()?
.pop()
}

/// Get `KeyValue` start from a revision and convert to `Event`
pub(crate) fn get_event_from_revision(
&self,
Expand Down
102 changes: 60 additions & 42 deletions xline/src/storage/kvwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ use tracing::debug;
use utils::parking_lot_lock::RwLockMap;

use super::storage_api::StorageApi;
use crate::{rpc::Event, server::command::KeyRange, storage::kv_store::KvStore};
use crate::{
rpc::{Event, KeyValue},
server::command::KeyRange,
storage::kv_store::KvStore,
};

/// Watch ID
pub(crate) type WatchId = i64;
Expand Down Expand Up @@ -102,30 +106,30 @@ impl Watcher {
&self.key_range
}

/// Get start revision
fn start_rev(&self) -> i64 {
self.start_rev
}

/// Notify events
fn notify(
&mut self,
(revision, mut events): (i64, Vec<Event>),
) -> Result<(), TrySendError<WatchEvent>> {
if revision < self.start_rev() {
return Ok(());
}
/// filter out events
fn filter_events(&self, mut events: Vec<Event>) -> Vec<Event> {
events.retain(|event| {
self.filters.iter().all(|filter| filter != &event.r#type)
&& (event
.kv
.as_ref()
.map_or(false, |kv| kv.mod_revision >= self.start_rev))
});
events
}

/// Notify all passed events, please filter out events before calling this method
fn notify(
&mut self,
(revision, events): (i64, Vec<Event>),
) -> Result<(), TrySendError<WatchEvent>> {
if revision < self.start_rev {
return Ok(());
}
let events = self.filter_events(events);
if events.is_empty() {
return Ok(());
}

let watch_id = self.watch_id();
debug!(
watch_id,
Expand Down Expand Up @@ -288,6 +292,9 @@ pub(crate) trait KvWatcherOps {

/// Cancel a watch from KV store
fn cancel(&self, id: WatchId);

/// Get Prev `KeyValue` of a `KeyValue`
fn get_prev_kv(&self, kv: &KeyValue) -> Option<KeyValue>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -348,6 +355,10 @@ where
fn cancel(&self, watch_id: WatchId) {
self.watcher_map.write().remove(watch_id);
}

fn get_prev_kv(&self, kv: &KeyValue) -> Option<KeyValue> {
self.storage.get_prev_kv(kv)
}
}

impl<S> KvWatcher<S>
Expand Down Expand Up @@ -392,6 +403,7 @@ where
.map_write(|mut m| m.victims.drain().collect::<Vec<_>>());
let mut new_victims = HashMap::new();
for (mut watcher, res) in victims {
// needn't to filter updates and get prev_kv, because the watcher is already filtered before inserted into victims
if let Err(TrySendError::Full(watch_event)) = watcher.notify(res) {
assert!(
new_victims
Expand Down Expand Up @@ -564,20 +576,16 @@ mod test {
let handle = tokio::spawn({
let store = Arc::clone(&store);
async move {
let mut revision = 2;
// let mut revision = 2;
for i in 0..100_u8 {
let req = RequestWithToken::new(
PutRequest {
key: "foo".into(),
value: vec![i],
..Default::default()
}
.into(),
);
let (sync_res, ops) = store.after_sync(&req, revision).await.unwrap();
db.flush_ops(ops).unwrap();
store.mark_index_available(sync_res.revision());
revision += 1;
put(
store.as_ref(),
db.as_ref(),
"foo",
vec![i],
i.overflow_add(2).cast(),
)
.await;
}
}
});
Expand Down Expand Up @@ -633,8 +641,8 @@ mod test {

let mut expect = 0;
let handle = tokio::spawn(async move {
'outer: while let Some(watch_event) = event_rx.recv().await {
for event in watch_event.events {
'outer: while let Some(watch_events) = event_rx.recv().await {
for event in watch_events.events {
let val = event.kv.as_ref().unwrap().value[0];
assert_eq!(val, expect);
expect += 1;
Expand All @@ -645,18 +653,8 @@ mod test {
}
});

for i in 0..100u8 {
let req = RequestWithToken::new(
PutRequest {
key: "foo".into(),
value: vec![i],
..Default::default()
}
.into(),
);
let (sync_res, ops) = store.after_sync(&req, i.cast()).await.unwrap();
db.flush_ops(ops).unwrap();
store.mark_index_available(sync_res.revision());
for i in 0..100_u8 {
put(store.as_ref(), db.as_ref(), "foo", vec![i], i.cast()).await;
}
handle.await.unwrap();
}
Expand All @@ -680,4 +678,24 @@ mod test {
assert!(kv_watcher.watcher_map.read().index.is_empty());
assert!(kv_watcher.watcher_map.read().watchers.is_empty());
}

async fn put(
store: &KvStore<DB>,
db: &DB,
key: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
revision: i64,
) {
let req = RequestWithToken::new(
PutRequest {
key: key.into(),
value: value.into(),
..Default::default()
}
.into(),
);
let (sync_res, ops) = store.after_sync(&req, revision).await.unwrap();
db.flush_ops(ops).unwrap();
store.mark_index_available(sync_res.revision());
}
}

0 comments on commit 6c768da

Please sign in to comment.