Skip to content

Commit

Permalink
Improve watch example
Browse files Browse the repository at this point in the history
  • Loading branch information
davidli2010 committed Apr 23, 2022
1 parent ccf6973 commit 2e8be33
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
6 changes: 4 additions & 2 deletions examples/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ async fn main() -> Result<(), Error> {
println!();

client.put("foo", "bar2", None).await?;
watcher.request_progress().await?;
client.delete("foo", None).await?;

while let Some(resp) = stream.message().await? {
println!("receive watch response");
println!("[{}] receive watch response", resp.watch_id());
println!("compact revision: {}", resp.compact_revision());

if resp.canceled() {
println!("watch canceled!");
println!("watch canceled: {}", resp.cancel_reason());
break;
}

Expand Down
22 changes: 11 additions & 11 deletions src/rpc/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,28 @@ impl WatchClient {
key: impl Into<Vec<u8>>,
options: Option<WatchOptions>,
) -> Result<(Watcher, WatchStream)> {
let (sender, receiver) = channel::<WatchRequest>(100);
sender
let (request_sender, request_receiver) = channel::<WatchRequest>(100);
let request_stream = ReceiverStream::new(request_receiver);

request_sender
.send(options.unwrap_or_default().with_key(key).into())
.await
.map_err(|e| Error::WatchError(e.to_string()))?;

let receiver = ReceiverStream::new(receiver);

let mut stream = self.inner.watch(receiver).await?.into_inner();
let response_stream = self.inner.watch(request_stream).await?.into_inner();
let mut watch_stream = WatchStream::new(response_stream);

let watch_id = match stream.message().await? {
let watch_id = match watch_stream.message().await? {
Some(resp) => {
assert!(resp.created, "not a create watch response");
resp.watch_id
assert!(resp.created(), "not a create watch response");
resp.watch_id()
}
None => {
return Err(Error::WatchError("failed to create watch".to_string()));
}
};

Ok((Watcher::new(watch_id, sender), WatchStream::new(stream)))
Ok((Watcher::new(watch_id, request_sender), watch_stream))
}
}

Expand Down Expand Up @@ -266,9 +267,8 @@ impl WatchResponse {
/// The client should record the watch_id and expect to receive events for
/// the created watcher from the same stream.
/// All events sent to the created watcher will attach with the same watch_id.
#[allow(dead_code)]
#[inline]
const fn created(&self) -> bool {
pub const fn created(&self) -> bool {
self.0.created
}

Expand Down

0 comments on commit 2e8be33

Please sign in to comment.