Skip to content

Commit f7ef132

Browse files
chore(bridge-withdrawer): pass GRPC and CometBFT clients to consumers directly (#1510)
## Summary Changed `sequencer_cometbft_client` and `sequencer_grpc_client` construction so that each can be passed to consumers via cloning. ## Background Both clients were previously constructed ad-hoc from endpoints, which was unnecessary. ## Changes - Moved construction of `sequencer_cometbft_client` and `sequencer_grpc_client` up so that each can be passed to its consumers (`Startup` and `Submitter`) via cloning. ## Testing Passing all tests ## Related Issues closes #1315
1 parent 1be156e commit f7ef132

File tree

4 files changed

+36
-44
lines changed

4 files changed

+36
-44
lines changed

crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs

+14-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{
44
time::Duration,
55
};
66

7+
use astria_core::generated::sequencerblock::v1alpha1::sequencer_service_client;
78
use astria_eyre::eyre::{
89
self,
910
WrapErr as _,
@@ -92,28 +93,35 @@ impl BridgeWithdrawer {
9293
.parse()
9394
.wrap_err("failed to parse sequencer bridge address")?;
9495

96+
let sequencer_grpc_connection =
97+
tonic::transport::Endpoint::new(sequencer_grpc_endpoint)?.connect_lazy();
98+
let sequencer_grpc_client =
99+
sequencer_service_client::SequencerServiceClient::new(sequencer_grpc_connection);
100+
let sequencer_cometbft_client =
101+
sequencer_client::HttpClient::new(&*sequencer_cometbft_endpoint)
102+
.wrap_err("failed constructing cometbft http client")?;
103+
95104
// make startup object
96105
let startup = startup::Builder {
97106
shutdown_token: shutdown_handle.token(),
98107
state: state.clone(),
99108
sequencer_chain_id,
100-
sequencer_cometbft_endpoint: sequencer_cometbft_endpoint.clone(),
109+
sequencer_cometbft_client: sequencer_cometbft_client.clone(),
101110
sequencer_bridge_address,
102-
sequencer_grpc_endpoint: sequencer_grpc_endpoint.clone(),
111+
sequencer_grpc_client: sequencer_grpc_client.clone(),
103112
expected_fee_asset: fee_asset_denomination,
104113
metrics,
105114
}
106-
.build()
107-
.wrap_err("failed to initialize startup")?;
115+
.build();
108116

109117
let startup_handle = startup::InfoHandle::new(state.subscribe());
110118

111119
// make submitter object
112120
let (submitter, submitter_handle) = submitter::Builder {
113121
shutdown_token: shutdown_handle.token(),
114122
startup_handle: startup_handle.clone(),
115-
sequencer_cometbft_endpoint,
116-
sequencer_grpc_endpoint,
123+
sequencer_cometbft_client,
124+
sequencer_grpc_client,
117125
sequencer_key_path,
118126
sequencer_address_prefix: sequencer_address_prefix.clone(),
119127
state: state.clone(),

crates/astria-bridge-withdrawer/src/bridge_withdrawer/startup.rs

+12-21
Original file line numberDiff line numberDiff line change
@@ -65,40 +65,36 @@ pub(super) struct Builder {
6565
pub(super) shutdown_token: CancellationToken,
6666
pub(super) state: Arc<State>,
6767
pub(super) sequencer_chain_id: String,
68-
pub(super) sequencer_cometbft_endpoint: String,
69-
pub(super) sequencer_grpc_endpoint: String,
68+
pub(super) sequencer_cometbft_client: sequencer_client::HttpClient,
69+
pub(super) sequencer_grpc_client: SequencerServiceClient<Channel>,
7070
pub(super) sequencer_bridge_address: Address,
7171
pub(super) expected_fee_asset: asset::Denom,
7272
pub(super) metrics: &'static Metrics,
7373
}
7474

7575
impl Builder {
76-
pub(super) fn build(self) -> eyre::Result<Startup> {
76+
pub(super) fn build(self) -> Startup {
7777
let Self {
7878
shutdown_token,
7979
state,
8080
sequencer_chain_id,
81-
sequencer_cometbft_endpoint,
81+
sequencer_cometbft_client,
8282
sequencer_bridge_address,
83-
sequencer_grpc_endpoint,
83+
sequencer_grpc_client,
8484
expected_fee_asset,
8585
metrics,
8686
} = self;
8787

88-
let sequencer_cometbft_client =
89-
sequencer_client::HttpClient::new(&*sequencer_cometbft_endpoint)
90-
.wrap_err("failed constructing cometbft http client")?;
91-
92-
Ok(Startup {
88+
Startup {
9389
shutdown_token,
9490
state,
9591
sequencer_chain_id,
9692
sequencer_cometbft_client,
97-
sequencer_grpc_endpoint,
93+
sequencer_grpc_client,
9894
sequencer_bridge_address,
9995
expected_fee_asset,
10096
metrics,
101-
})
97+
}
10298
}
10399
}
104100

@@ -141,7 +137,7 @@ pub(super) struct Startup {
141137
state: Arc<State>,
142138
sequencer_chain_id: String,
143139
sequencer_cometbft_client: sequencer_client::HttpClient,
144-
sequencer_grpc_endpoint: String,
140+
sequencer_grpc_client: SequencerServiceClient<Channel>,
145141
sequencer_bridge_address: Address,
146142
expected_fee_asset: asset::Denom,
147143
metrics: &'static Metrics,
@@ -159,7 +155,7 @@ impl Startup {
159155

160156
wait_for_empty_mempool(
161157
self.sequencer_cometbft_client.clone(),
162-
self.sequencer_grpc_endpoint.clone(),
158+
self.sequencer_grpc_client.clone(),
163159
self.sequencer_bridge_address,
164160
self.state.clone(),
165161
self.metrics,
@@ -400,7 +396,7 @@ async fn ensure_mempool_empty(
400396
#[instrument(skip_all, err)]
401397
async fn wait_for_empty_mempool(
402398
cometbft_client: sequencer_client::HttpClient,
403-
sequencer_grpc_endpoint: String,
399+
sequencer_grpc_client: SequencerServiceClient<Channel>,
404400
address: Address,
405401
state: Arc<State>,
406402
metrics: &'static Metrics,
@@ -424,14 +420,9 @@ async fn wait_for_empty_mempool(
424420
futures::future::ready(())
425421
},
426422
);
427-
let sequencer_client = SequencerServiceClient::connect(sequencer_grpc_endpoint.clone())
428-
.await
429-
.wrap_err_with(|| {
430-
format!("failed to connect to sequencer at `{sequencer_grpc_endpoint}`")
431-
})?;
432423

433424
tryhard::retry_fn(|| {
434-
let sequencer_client = sequencer_client.clone();
425+
let sequencer_client = sequencer_grpc_client.clone();
435426
let cometbft_client = cometbft_client.clone();
436427
let state = state.clone();
437428
ensure_mempool_empty(cometbft_client, sequencer_client, address, state, metrics)

crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/builder.rs

+6-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22

3+
use astria_core::generated::sequencerblock::v1alpha1::sequencer_service_client::SequencerServiceClient;
34
use astria_eyre::eyre::{
45
self,
56
Context as _,
@@ -47,8 +48,8 @@ pub(crate) struct Builder {
4748
pub(crate) startup_handle: startup::InfoHandle,
4849
pub(crate) sequencer_key_path: String,
4950
pub(crate) sequencer_address_prefix: String,
50-
pub(crate) sequencer_cometbft_endpoint: String,
51-
pub(crate) sequencer_grpc_endpoint: String,
51+
pub(crate) sequencer_cometbft_client: sequencer_client::HttpClient,
52+
pub(crate) sequencer_grpc_client: SequencerServiceClient<tonic::transport::Channel>,
5253
pub(crate) state: Arc<State>,
5354
pub(crate) metrics: &'static Metrics,
5455
}
@@ -61,8 +62,8 @@ impl Builder {
6162
startup_handle,
6263
sequencer_key_path,
6364
sequencer_address_prefix,
64-
sequencer_cometbft_endpoint,
65-
sequencer_grpc_endpoint,
65+
sequencer_cometbft_client,
66+
sequencer_grpc_client,
6667
state,
6768
metrics,
6869
} = self;
@@ -74,10 +75,6 @@ impl Builder {
7475
.wrap_err("failed to load sequencer private key")?;
7576
info!(address = %signer.address(), "loaded sequencer signer");
7677

77-
let sequencer_cometbft_client =
78-
sequencer_client::HttpClient::new(&*sequencer_cometbft_endpoint)
79-
.wrap_err("failed constructing cometbft http client")?;
80-
8178
let (batches_tx, batches_rx) = tokio::sync::mpsc::channel(BATCH_QUEUE_SIZE);
8279
let handle = Handle::new(batches_tx);
8380

@@ -88,7 +85,7 @@ impl Builder {
8885
state,
8986
batches_rx,
9087
sequencer_cometbft_client,
91-
sequencer_grpc_endpoint,
88+
sequencer_grpc_client,
9289
signer,
9390
metrics,
9491
},

crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs

+4-8
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,14 @@ pub(super) struct Submitter {
6969
state: Arc<State>,
7070
batches_rx: mpsc::Receiver<Batch>,
7171
sequencer_cometbft_client: sequencer_client::HttpClient,
72-
sequencer_grpc_endpoint: String,
72+
sequencer_grpc_client: SequencerServiceClient<Channel>,
7373
signer: SequencerKey,
7474
metrics: &'static Metrics,
7575
}
7676

7777
impl Submitter {
7878
pub(super) async fn run(mut self) -> eyre::Result<()> {
79-
let (sequencer_chain_id, sequencer_grpc_client) = select! {
79+
let sequencer_chain_id = select! {
8080
() = self.shutdown_token.cancelled() => {
8181
report_exit(Ok("submitter received shutdown signal while waiting for startup"));
8282
return Ok(());
@@ -85,12 +85,8 @@ impl Submitter {
8585
startup_info = self.startup_handle.get_info() => {
8686
let startup::Info { chain_id, .. } = startup_info.wrap_err("submitter failed to get startup info")?;
8787

88-
let sequencer_grpc_client = sequencer_service_client::SequencerServiceClient::connect(
89-
self.sequencer_grpc_endpoint.clone(),
90-
).await.wrap_err("failed to connect to sequencer gRPC endpoint")?;
91-
9288
self.state.set_submitter_ready();
93-
(chain_id, sequencer_grpc_client)
89+
chain_id
9490
}
9591
};
9692
self.state.set_submitter_ready();
@@ -110,7 +106,7 @@ impl Submitter {
110106

111107
// if batch submission fails, halt the submitter
112108
if let Err(e) = self.process_batch(
113-
sequencer_grpc_client.clone(),
109+
self.sequencer_grpc_client.clone(),
114110
&sequencer_chain_id,
115111
actions,
116112
rollup_height,

0 commit comments

Comments
 (0)