Skip to content

Commit f4618cf

Browse files
authored
feat(code/network): Only enable the network behaviors which are actually required (#928)
* Only enable the network behaviours which are actually required * Enable the sync channel in the Starknet app * Enable the sync channel in the test app
1 parent ef97433 commit f4618cf

File tree

7 files changed

+114
-58
lines changed

7 files changed

+114
-58
lines changed

code/crates/app/src/spawn.rs

+1
Original file line numberDiff line numberDiff line change
@@ -200,5 +200,6 @@ fn make_gossip_config(cfg: &ConsensusConfig) -> NetworkConfig {
200200
},
201201
rpc_max_size: cfg.p2p.rpc_max_size.as_u64() as usize,
202202
pubsub_max_size: cfg.p2p.pubsub_max_size.as_u64() as usize,
203+
enable_sync: true,
203204
}
204205
}

code/crates/config/src/lib.rs

+11
Original file line numberDiff line numberDiff line change
@@ -379,10 +379,21 @@ pub enum VoteSyncMode {
379379
/// The lagging node sends a request to a peer for the missing votes
380380
#[default]
381381
RequestResponse,
382+
382383
/// Nodes rebroadcast their last vote to all peers
383384
Rebroadcast,
384385
}
385386

387+
impl VoteSyncMode {
388+
pub fn is_request_response(&self) -> bool {
389+
matches!(self, Self::RequestResponse)
390+
}
391+
392+
pub fn is_rebroadcast(&self) -> bool {
393+
matches!(self, Self::Rebroadcast)
394+
}
395+
}
396+
386397
/// Message types required by consensus to deliver the value being proposed
387398
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
388399
#[serde(rename_all = "kebab-case")]

code/crates/network/src/behaviour.rs

+54-31
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::time::Duration;
22

33
use libp2p::kad::{Addresses, KBucketKey, KBucketRef};
44
use libp2p::request_response::{OutboundRequestId, ResponseChannel};
5+
use libp2p::swarm::behaviour::toggle::Toggle;
56
use libp2p::swarm::NetworkBehaviour;
67
use libp2p::{gossipsub, identify, ping};
78
use libp2p_broadcast as broadcast;
@@ -66,10 +67,10 @@ impl From<discovery::NetworkEvent> for NetworkEvent {
6667
pub struct Behaviour {
6768
pub identify: identify::Behaviour,
6869
pub ping: ping::Behaviour,
69-
pub gossipsub: gossipsub::Behaviour,
70-
pub broadcast: broadcast::Behaviour,
71-
pub sync: sync::Behaviour,
72-
pub discovery: discovery::Behaviour,
70+
pub gossipsub: Toggle<gossipsub::Behaviour>,
71+
pub broadcast: Toggle<broadcast::Behaviour>,
72+
pub sync: Toggle<sync::Behaviour>,
73+
pub discovery: Toggle<discovery::Behaviour>,
7374
}
7475

7576
/// Dummy implementation of Debug for Behaviour.
@@ -82,6 +83,8 @@ impl std::fmt::Debug for Behaviour {
8283
impl discovery::DiscoveryClient for Behaviour {
8384
fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> libp2p::kad::RoutingUpdate {
8485
self.discovery
86+
.as_mut()
87+
.expect("Discovery behaviour should be available")
8588
.kademlia
8689
.as_mut()
8790
.expect("Kademlia behaviour should be available")
@@ -90,22 +93,32 @@ impl discovery::DiscoveryClient for Behaviour {
9093

9194
fn kbuckets(&mut self) -> impl Iterator<Item = KBucketRef<'_, KBucketKey<PeerId>, Addresses>> {
9295
self.discovery
96+
.as_mut()
97+
.expect("Discovery behaviour should be available")
9398
.kademlia
9499
.as_mut()
95100
.expect("Kademlia behaviour should be available")
96101
.kbuckets()
97102
}
98103

99104
fn send_request(&mut self, peer_id: &PeerId, req: discovery::Request) -> OutboundRequestId {
100-
self.discovery.request_response.send_request(peer_id, req)
105+
self.discovery
106+
.as_mut()
107+
.expect("Discovery behaviour should be available")
108+
.request_response
109+
.send_request(peer_id, req)
101110
}
102111

103112
fn send_response(
104113
&mut self,
105114
ch: ResponseChannel<discovery::Response>,
106115
rs: discovery::Response,
107116
) -> Result<(), discovery::Response> {
108-
self.discovery.request_response.send_response(ch, rs)
117+
self.discovery
118+
.as_mut()
119+
.expect("Discovery behaviour should be available")
120+
.request_response
121+
.send_response(ch, rs)
109122
}
110123
}
111124

@@ -144,35 +157,45 @@ impl Behaviour {
144157

145158
let ping = ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(5)));
146159

147-
let gossipsub = gossipsub::Behaviour::new_with_metrics(
148-
gossipsub::MessageAuthenticity::Signed(keypair.clone()),
149-
gossipsub_config(config.gossipsub, config.pubsub_max_size),
150-
registry.sub_registry_with_prefix("gossipsub"),
151-
Default::default(),
152-
)
153-
.unwrap();
154-
155-
let broadcast = broadcast::Behaviour::new_with_metrics(
156-
broadcast::Config {
157-
max_buf_size: config.pubsub_max_size,
158-
},
159-
registry.sub_registry_with_prefix("broadcast"),
160-
);
161-
162-
let sync = sync::Behaviour::new_with_metrics(
163-
sync::Config::default().with_max_response_size(config.rpc_max_size),
164-
registry.sub_registry_with_prefix("sync"),
165-
);
166-
167-
let discovery = discovery::Behaviour::new(keypair, config.discovery);
160+
let gossipsub = config.pubsub_protocol.is_gossipsub().then(|| {
161+
gossipsub::Behaviour::new_with_metrics(
162+
gossipsub::MessageAuthenticity::Signed(keypair.clone()),
163+
gossipsub_config(config.gossipsub, config.pubsub_max_size),
164+
registry.sub_registry_with_prefix("gossipsub"),
165+
Default::default(),
166+
)
167+
.unwrap()
168+
});
169+
170+
let enable_broadcast = config.pubsub_protocol.is_broadcast() || config.enable_sync;
171+
let broadcast = enable_broadcast.then(|| {
172+
broadcast::Behaviour::new_with_metrics(
173+
broadcast::Config {
174+
max_buf_size: config.pubsub_max_size,
175+
},
176+
registry.sub_registry_with_prefix("broadcast"),
177+
)
178+
});
179+
180+
let sync = config.enable_sync.then(|| {
181+
sync::Behaviour::new_with_metrics(
182+
sync::Config::default().with_max_response_size(config.rpc_max_size),
183+
registry.sub_registry_with_prefix("sync"),
184+
)
185+
});
186+
187+
let discovery = config
188+
.discovery
189+
.enabled
190+
.then(|| discovery::Behaviour::new(keypair, config.discovery));
168191

169192
Self {
170193
identify,
171194
ping,
172-
gossipsub,
173-
broadcast,
174-
sync,
175-
discovery,
195+
sync: Toggle::from(sync),
196+
gossipsub: Toggle::from(gossipsub),
197+
broadcast: Toggle::from(broadcast),
198+
discovery: Toggle::from(discovery),
176199
}
177200
}
178201
}

code/crates/network/src/lib.rs

+24-9
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ pub struct Config {
9494
pub pubsub_protocol: PubSubProtocol,
9595
pub rpc_max_size: usize,
9696
pub pubsub_max_size: usize,
97+
pub enable_sync: bool,
9798
}
9899

99100
impl Config {
@@ -232,10 +233,12 @@ async fn run(
232233
return;
233234
};
234235

235-
if let Err(e) = pubsub::subscribe(&mut swarm, PubSubProtocol::Broadcast, &[Channel::Sync]) {
236-
error!("Error subscribing to Sync channel: {e}");
237-
return;
238-
};
236+
if config.enable_sync {
237+
if let Err(e) = pubsub::subscribe(&mut swarm, PubSubProtocol::Broadcast, &[Channel::Sync]) {
238+
error!("Error subscribing to Sync channel: {e}");
239+
return;
240+
};
241+
}
239242

240243
loop {
241244
let result = tokio::select! {
@@ -295,6 +298,11 @@ async fn handle_ctrl_msg(
295298
}
296299

297300
CtrlMsg::Broadcast(channel, data) => {
301+
if channel == Channel::Sync && !config.enable_sync {
302+
trace!("Ignoring broadcast message to Sync channel: Sync not enabled");
303+
return ControlFlow::Continue(());
304+
}
305+
298306
let msg_size = data.len();
299307
let result = pubsub::publish(swarm, PubSubProtocol::Broadcast, channel, data);
300308

@@ -307,10 +315,12 @@ async fn handle_ctrl_msg(
307315
}
308316

309317
CtrlMsg::SyncRequest(peer_id, request, reply_to) => {
310-
let request_id = swarm
311-
.behaviour_mut()
312-
.sync
313-
.send_request(peer_id.to_libp2p(), request);
318+
let Some(sync) = swarm.behaviour_mut().sync.as_mut() else {
319+
error!("Cannot request Sync from peer: Sync not enabled");
320+
return ControlFlow::Continue(());
321+
};
322+
323+
let request_id = sync.send_request(peer_id.to_libp2p(), request);
314324

315325
if let Err(e) = reply_to.send(request_id) {
316326
error!(%peer_id, "Error sending Sync request: {e}");
@@ -320,12 +330,17 @@ async fn handle_ctrl_msg(
320330
}
321331

322332
CtrlMsg::SyncReply(request_id, data) => {
333+
let Some(sync) = swarm.behaviour_mut().sync.as_mut() else {
334+
error!("Cannot send Sync response to peer: Sync not enabled");
335+
return ControlFlow::Continue(());
336+
};
337+
323338
let Some(channel) = state.sync_channels.remove(&request_id) else {
324339
error!(%request_id, "Received Sync reply for unknown request ID");
325340
return ControlFlow::Continue(());
326341
};
327342

328-
let result = swarm.behaviour_mut().sync.send_response(channel, data);
343+
let result = sync.send_response(channel, data);
329344

330345
match result {
331346
Ok(()) => debug!(%request_id, "Replied to Sync request"),

code/crates/network/src/pubsub.rs

+22-18
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,21 @@ pub fn subscribe(
1111
) -> Result<(), eyre::Report> {
1212
match protocol {
1313
PubSubProtocol::GossipSub => {
14-
for channel in channels {
15-
swarm
16-
.behaviour_mut()
17-
.gossipsub
18-
.subscribe(&channel.to_gossipsub_topic())?;
14+
if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
15+
for channel in channels {
16+
gossipsub.subscribe(&channel.to_gossipsub_topic())?;
17+
}
18+
} else {
19+
return Err(eyre::eyre!("GossipSub not enabled"));
1920
}
2021
}
2122
PubSubProtocol::Broadcast => {
22-
for channel in channels {
23-
swarm
24-
.behaviour_mut()
25-
.broadcast
26-
.subscribe(channel.to_broadcast_topic());
23+
if let Some(broadcast) = swarm.behaviour_mut().broadcast.as_mut() {
24+
for channel in channels {
25+
broadcast.subscribe(channel.to_broadcast_topic());
26+
}
27+
} else {
28+
return Err(eyre::eyre!("Broadcast not enabled"));
2729
}
2830
}
2931
}
@@ -39,16 +41,18 @@ pub fn publish(
3941
) -> Result<(), eyre::Report> {
4042
match protocol {
4143
PubSubProtocol::GossipSub => {
42-
swarm
43-
.behaviour_mut()
44-
.gossipsub
45-
.publish(channel.to_gossipsub_topic(), data)?;
44+
if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
45+
gossipsub.publish(channel.to_gossipsub_topic(), data)?;
46+
} else {
47+
return Err(eyre::eyre!("GossipSub not enabled"));
48+
}
4649
}
4750
PubSubProtocol::Broadcast => {
48-
swarm
49-
.behaviour_mut()
50-
.broadcast
51-
.broadcast(&channel.to_broadcast_topic(), data);
51+
if let Some(broadcast) = swarm.behaviour_mut().broadcast.as_mut() {
52+
broadcast.broadcast(&channel.to_broadcast_topic(), data);
53+
} else {
54+
return Err(eyre::eyre!("Broadcast not enabled"));
55+
}
5256
}
5357
}
5458

code/crates/network/test/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ impl<const N: usize> Test<N> {
162162
pubsub_protocol: malachitebft_network::PubSubProtocol::default(),
163163
rpc_max_size: 10 * 1024 * 1024, // 10 MiB
164164
pubsub_max_size: 4 * 1024 * 1024, // 4 MiB
165+
enable_sync: false,
165166
})
166167
}
167168

code/crates/starknet/host/src/spawn.rs

+1
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ async fn spawn_network_actor(
247247
},
248248
rpc_max_size: cfg.consensus.p2p.rpc_max_size.as_u64() as usize,
249249
pubsub_max_size: cfg.consensus.p2p.pubsub_max_size.as_u64() as usize,
250+
enable_sync: true,
250251
};
251252

252253
let keypair = make_keypair(private_key);

0 commit comments

Comments
 (0)