From c80ed8c11b588ea72c65f358962cc4f2c90a899d Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 17 Jan 2024 12:56:21 +0100 Subject: [PATCH] use broadcast sender --- lite-rpc/src/block_priofees.rs | 22 +++++++++++++++++----- lite-rpc/src/bridge.rs | 2 +- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/lite-rpc/src/block_priofees.rs b/lite-rpc/src/block_priofees.rs index a68fcbc1..9b086361 100644 --- a/lite-rpc/src/block_priofees.rs +++ b/lite-rpc/src/block_priofees.rs @@ -7,7 +7,9 @@ use jsonrpsee::tracing::field::debug; use log::{debug, error, info, trace, warn}; use solana_rpc_client_api::response::Fees; use solana_sdk::clock::Slot; +use tokio::sync::broadcast::Sender; use tokio::sync::broadcast::error::RecvError::{Closed, Lagged}; +use tokio::sync::broadcast::error::SendError; use tokio::sync::broadcast::Receiver; use tokio::task::JoinHandle; use solana_lite_rpc_cluster_endpoints::CommitmentLevel; @@ -25,7 +27,8 @@ pub struct PrioFeeStore { pub struct PrioFeesService { pub block_fees_store: PrioFeeStore, - pub block_fees_stream: Receiver, + // use .subscribe() to get a receiver + pub block_fees_stream: Sender, } impl PrioFeesService { @@ -50,10 +53,11 @@ pub async fn start_priofees_task(data_cache: DataCache, mut block_stream: BlockS recent: recent_data.clone(), slot_cache: data_cache.slot_cache, }; - let (priofees_update_sender, priofees_update_receiver) = tokio::sync::broadcast::channel(100); - drop(priofees_update_receiver); + let (priofees_update_sender, _priofees_update_receiver) = tokio::sync::broadcast::channel(100); + let senderfoooo = priofees_update_sender.clone(); let jh_priofees_task = tokio::spawn(async move { + let sender = priofees_update_sender.clone(); 'recv_loop: loop { let block = block_stream.recv().await; match block { @@ -80,7 +84,15 @@ pub async fn start_priofees_task(data_cache: DataCache, mut block_stream: BlockS slot, fees_info: prioritization_fees_info, }; - priofees_update_sender.send(msg).unwrap(); + let send_result = sender.send(msg); + match send_result { + Ok(n_subscribers) => { + trace!("sent priofees update message to {} subscribers", n_subscribers); + } + Err(_) => { + trace!("no subscribers for priofees update message"); + } + } } Err(Lagged(_lagged)) => { warn!("channel error receiving block for priofees calculation - continue"); @@ -99,7 +111,7 @@ pub async fn start_priofees_task(data_cache: DataCache, mut block_stream: BlockS jh_priofees_task, PrioFeesService { block_fees_store: store, - block_fees_stream: priofees_update_receiver, + block_fees_stream: senderfoooo, } ) } diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index 4a00897f..301458ef 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -527,7 +527,7 @@ impl LiteRpcServer for LiteBridge { ) -> SubscriptionResult { let sink = pending.accept().await?; - let mut block_fees_stream = self.prio_fees_service.block_fees_stream.resubscribe(); + let mut block_fees_stream = self.prio_fees_service.block_fees_stream.subscribe(); tokio::spawn(async move { RPC_BLOCK_PRIOFEES_SUBSCRIBE.inc(); while let Ok(PrioFeesUpdateMessage { slot: confirmation_slot, fees_info}) = block_fees_stream.recv().await {