Skip to content

Commit

Permalink
client: rename max_notifs_per_subscription to `max_buffer_capacity_…
Browse files Browse the repository at this point in the history
…per_subscription` (#1012)

* client: rename `max_buffer_capacity_per_sub`

rename `max_notifs_per_subscription` to `max_buffer_capacity_per_sub`

* Update core/src/client/mod.rs
  • Loading branch information
niklasad1 authored Feb 13, 2023
1 parent f542a00 commit 18ee212
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 42 deletions.
21 changes: 13 additions & 8 deletions client/wasm-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use jsonrpsee_core::Error;
pub struct WasmClientBuilder {
id_kind: IdKind,
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
max_log_length: u32,
request_timeout: Duration,
}
Expand All @@ -74,7 +74,7 @@ impl Default for WasmClientBuilder {
id_kind: IdKind::Number,
max_log_length: 4096,
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
max_buffer_capacity_per_subscription: 1024,
request_timeout: Duration::from_secs(60),
}
}
Expand All @@ -93,9 +93,9 @@ impl WasmClientBuilder {
self
}

/// See documentation [`ClientBuilder::max_notifs_per_subscription`] (default is 1024).
pub fn max_notifs_per_subscription(mut self, max: usize) -> Self {
self.max_notifs_per_subscription = max;
/// See documentation [`ClientBuilder::max_buffer_capacity_per_subscription`] (default is 1024).
pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self {
self.max_buffer_capacity_per_subscription = max;
self
}

Expand All @@ -115,15 +115,20 @@ impl WasmClientBuilder {

/// Build the client with specified URL to connect to.
pub async fn build(self, url: impl AsRef<str>) -> Result<Client, Error> {
let Self { max_log_length, id_kind, request_timeout, max_concurrent_requests, max_notifs_per_subscription } =
self;
let Self {
max_log_length,
id_kind,
request_timeout,
max_concurrent_requests,
max_buffer_capacity_per_subscription,
} = self;
let (sender, receiver) = web::connect(url).await.map_err(|e| Error::Transport(e.into()))?;

let builder = ClientBuilder::default()
.set_max_logging_length(max_log_length)
.request_timeout(request_timeout)
.id_format(id_kind)
.max_notifs_per_subscription(max_notifs_per_subscription)
.max_buffer_capacity_per_subscription(max_buffer_capacity_per_subscription)
.max_concurrent_requests(max_concurrent_requests);

Ok(builder.build_with_wasm(sender, receiver))
Expand Down
14 changes: 7 additions & 7 deletions client/ws-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub struct WsClientBuilder {
ping_interval: Option<Duration>,
headers: http::HeaderMap,
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
max_redirections: usize,
id_kind: IdKind,
max_log_length: u32,
Expand All @@ -100,7 +100,7 @@ impl Default for WsClientBuilder {
ping_interval: None,
headers: HeaderMap::new(),
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
max_buffer_capacity_per_subscription: 1024,
max_redirections: 5,
id_kind: IdKind::Number,
max_log_length: 4096,
Expand Down Expand Up @@ -181,9 +181,9 @@ impl WsClientBuilder {
self
}

/// See documentation [`ClientBuilder::max_notifs_per_subscription`] (default is 1024).
pub fn max_notifs_per_subscription(mut self, max: usize) -> Self {
self.max_notifs_per_subscription = max;
/// See documentation [`ClientBuilder::max_buffer_capacity_per_subscription`] (default is 1024).
pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self {
self.max_buffer_capacity_per_subscription = max;
self
}

Expand Down Expand Up @@ -224,7 +224,7 @@ impl WsClientBuilder {
ping_interval,
headers,
max_redirections,
max_notifs_per_subscription,
max_buffer_capacity_per_subscription,
id_kind,
max_log_length,
} = self;
Expand All @@ -242,7 +242,7 @@ impl WsClientBuilder {
let (sender, receiver) = transport_builder.build(uri).await.map_err(|e| Error::Transport(e.into()))?;

let mut client = ClientBuilder::default()
.max_notifs_per_subscription(max_notifs_per_subscription)
.max_buffer_capacity_per_subscription(max_buffer_capacity_per_subscription)
.request_timeout(request_timeout)
.max_concurrent_requests(max_concurrent_requests)
.id_format(id_kind)
Expand Down
2 changes: 1 addition & 1 deletion client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async fn notification_without_polling_doesnt_make_client_unuseable() {

let uri = to_ws_uri_string(server.local_addr());
let client = WsClientBuilder::default()
.max_notifs_per_subscription(4)
.max_buffer_capacity_per_subscription(4)
.build(&uri)
.with_default_timeout()
.await
Expand Down
53 changes: 34 additions & 19 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl ErrorFromBack {
pub struct ClientBuilder {
request_timeout: Duration,
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
id_kind: IdKind,
max_log_length: u32,
ping_interval: Option<Duration>,
Expand All @@ -86,7 +86,7 @@ impl Default for ClientBuilder {
Self {
request_timeout: Duration::from_secs(60),
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
max_buffer_capacity_per_subscription: 1024,
id_kind: IdKind::Number,
max_log_length: 4096,
ping_interval: None,
Expand All @@ -107,7 +107,7 @@ impl ClientBuilder {
self
}

/// Set max concurrent notification capacity for each subscription; when the capacity is exceeded the subscription
/// Set max buffer capacity for each subscription; when the capacity is exceeded the subscription
/// will be dropped (default is 1024).
///
/// You may prevent the subscription from being dropped by polling often enough
Expand All @@ -118,8 +118,8 @@ impl ClientBuilder {
/// # Panics
///
/// This function panics if `max` is 0.
pub fn max_notifs_per_subscription(mut self, max: usize) -> Self {
self.max_notifs_per_subscription = max;
pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self {
self.max_buffer_capacity_per_subscription = max;
self
}

Expand Down Expand Up @@ -168,7 +168,7 @@ impl ClientBuilder {
{
let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
let (err_tx, err_rx) = oneshot::channel();
let max_notifs_per_subscription = self.max_notifs_per_subscription;
let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
let ping_interval = self.ping_interval;
let (on_exit_tx, on_exit_rx) = oneshot::channel();

Expand All @@ -178,7 +178,7 @@ impl ClientBuilder {
receiver,
from_front,
err_tx,
max_notifs_per_subscription,
max_buffer_capacity_per_subscription,
ping_interval,
on_exit_rx,
)
Expand All @@ -204,11 +204,20 @@ impl ClientBuilder {
{
let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
let (err_tx, err_rx) = oneshot::channel();
let max_notifs_per_subscription = self.max_notifs_per_subscription;
let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
let (on_exit_tx, on_exit_rx) = oneshot::channel();

wasm_bindgen_futures::spawn_local(async move {
background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, None, on_exit_rx).await;
background_task(
sender,
receiver,
from_front,
err_tx,
max_buffer_capacity_per_subscription,
None,
on_exit_rx,
)
.await;
});
Client {
to_back,
Expand Down Expand Up @@ -500,22 +509,22 @@ async fn handle_backend_messages<S: TransportSenderT, R: TransportReceiverT>(
message: Option<Result<ReceivedMessage, R::Error>>,
manager: &mut RequestManager,
sender: &mut S,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
) -> Result<(), Error> {
// Handle raw messages of form `ReceivedMessage::Bytes` (Vec<u8>) or ReceivedMessage::Data` (String).
async fn handle_recv_message<S: TransportSenderT>(
raw: &[u8],
manager: &mut RequestManager,
sender: &mut S,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
) -> Result<(), Error> {
let first_non_whitespace = raw.iter().find(|byte| !byte.is_ascii_whitespace());

match first_non_whitespace {
Some(b'{') => {
// Single response to a request.
if let Ok(single) = serde_json::from_slice::<Response<_>>(raw) {
match process_single_response(manager, single, max_notifs_per_subscription) {
match process_single_response(manager, single, max_buffer_capacity_per_subscription) {
Ok(Some(unsub)) => {
stop_subscription(sender, manager, unsub).await;
}
Expand Down Expand Up @@ -599,10 +608,10 @@ async fn handle_backend_messages<S: TransportSenderT, R: TransportReceiverT>(
tracing::debug!("Received pong");
}
Some(Ok(ReceivedMessage::Bytes(raw))) => {
handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?;
handle_recv_message(raw.as_ref(), manager, sender, max_buffer_capacity_per_subscription).await?;
}
Some(Ok(ReceivedMessage::Text(raw))) => {
handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?;
handle_recv_message(raw.as_ref(), manager, sender, max_buffer_capacity_per_subscription).await?;
}
Some(Err(e)) => {
return Err(Error::Transport(e.into()));
Expand All @@ -620,7 +629,7 @@ async fn handle_frontend_messages<S: TransportSenderT>(
message: FrontToBack,
manager: &mut RequestManager,
sender: &mut S,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
) {
match message {
FrontToBack::Batch(batch) => {
Expand Down Expand Up @@ -678,7 +687,7 @@ async fn handle_frontend_messages<S: TransportSenderT>(
}
// User called `register_notification` on the front-end.
FrontToBack::RegisterNotification(reg) => {
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription);
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_buffer_capacity_per_subscription);

if manager.insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
Expand All @@ -699,7 +708,7 @@ async fn background_task<S, R>(
receiver: R,
frontend: mpsc::Receiver<FrontToBack>,
front_error: oneshot::Sender<Error>,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
ping_interval: Option<Duration>,
on_exit: oneshot::Receiver<()>,
) where
Expand Down Expand Up @@ -746,7 +755,13 @@ async fn background_task<S, R>(
break;
};

handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription).await;
handle_frontend_messages(
frontend_value,
&mut manager,
&mut sender,
max_buffer_capacity_per_subscription,
)
.await;

// Advance frontend, save backend.
message_fut = future::select(frontend.next(), backend);
Expand All @@ -758,7 +773,7 @@ async fn background_task<S, R>(
backend_value,
&mut manager,
&mut sender,
max_notifs_per_subscription,
max_buffer_capacity_per_subscription,
)
.await
{
Expand Down
9 changes: 5 additions & 4 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use crate::params::BatchRequestBuilder;
use crate::traits::ToRpcParams;
use async_trait::async_trait;
use core::marker::PhantomData;
use futures_util::future::FutureExt;
use futures_util::stream::{Stream, StreamExt};
use jsonrpsee_types::{ErrorObject, Id, SubscriptionId};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -216,8 +215,10 @@ pub enum SubscriptionKind {

/// Active subscription on the client.
///
/// It will automatically unsubscribe in the [`Subscription::drop`] so no need to explicitly call
/// the `unsubscribe method` if it is an an subscription based on [`SubscriptionId`].
/// It will try to `unsubscribe` in the drop implementation
/// but it may fail if the underlying buffer is full.
/// Thus, if you want to ensure it's actually unsubscribed then
/// [`Subscription::unsubscribe`] is recommended to use.
#[derive(Debug)]
pub struct Subscription<Notif> {
/// Channel to send requests to the background task.
Expand Down Expand Up @@ -381,7 +382,7 @@ impl<Notif> Drop for Subscription<Notif> {
Some(SubscriptionKind::Subscription(sub_id)) => FrontToBack::SubscriptionClosed(sub_id),
None => return,
};
let _ = self.to_back.send(msg).now_or_never();
let _ = self.to_back.try_send(msg);
}
}

Expand Down
9 changes: 6 additions & 3 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,11 @@ async fn ws_subscription_several_clients_with_drop() {

let mut clients = Vec::with_capacity(10);
for _ in 0..10 {
let client =
WsClientBuilder::default().max_notifs_per_subscription(u32::MAX as usize).build(&server_url).await.unwrap();
let client = WsClientBuilder::default()
.max_buffer_capacity_per_subscription(u32::MAX as usize)
.build(&server_url)
.await
.unwrap();
let hello_sub: Subscription<String> =
client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
let foo_sub: Subscription<u64> =
Expand Down Expand Up @@ -254,7 +257,7 @@ async fn ws_subscription_without_polling_doesnt_make_client_unuseable() {
let server_addr = server_with_subscription().await;
let server_url = format!("ws://{}", server_addr);

let client = WsClientBuilder::default().max_notifs_per_subscription(4).build(&server_url).await.unwrap();
let client = WsClientBuilder::default().max_buffer_capacity_per_subscription(4).build(&server_url).await.unwrap();
let mut hello_sub: Subscription<JsonValue> =
client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();

Expand Down

0 comments on commit 18ee212

Please sign in to comment.