Skip to content

Commit a8be717

Browse files
authored
feat(code/engine): Buffer consensus messages during startup and recovery (#860)
* feat(engine): Buffer consensus messages during startup and recovery This commit introduces message buffering for the consensus engine during startup and recovery phases. This is an important improvement to handle messages that arrive before the consensus engine is fully started or while it is recovering state via the WAL. - Add a `VecDeque` buffer to store messages that arrive during `Unstarted`/`Recovering` phases - Implement a message filtering function to determine which messages should be buffered - Add processing of buffered messages once consensus starts - Set a maximum buffer size of 1024 messages to prevent unbounded memory growth Messages that should be buffered include votes, proposals, and proposal parts. Control messages like `StartHeight`, `TimeoutElapsed`, and network connection events are processed immediately. The buffered messages are replayed in order once: 1. The WAL has been checked and replayed 2. The consensus engine transitions to the `Running` phase The buffer size is capped at 1024 messages to prevent memory exhaustion attacks. Messages beyond this limit are dropped with a warning log. * Buffer elapsed timeouts as well This does not really matter since no timeout can actually expire while we are replaying the WAL, but it is less misleading that way
1 parent a85a98a commit a8be717

File tree

5 files changed

+99
-8
lines changed

5 files changed

+99
-8
lines changed

code/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

code/crates/engine/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ malachitebft-sync.workspace = true
2626
malachitebft-wal.workspace = true
2727

2828
async-trait = { workspace = true }
29+
async-recursion = { workspace = true }
2930
bytes = { workspace = true, features = ["serde"] }
3031
byteorder = { workspace = true }
3132
derive-where = { workspace = true }

code/crates/engine/src/consensus.rs

+53-8
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ use std::collections::BTreeSet;
22
use std::sync::Arc;
33
use std::time::Duration;
44

5+
use async_recursion::async_recursion;
56
use async_trait::async_trait;
7+
use derive_where::derive_where;
68
use eyre::eyre;
79
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
810
use tokio::time::Instant;
9-
use tracing::{debug, error, info, warn};
11+
use tracing::{debug, error, error_span, info, warn};
1012

1113
use malachitebft_codec as codec;
1214
use malachitebft_config::TimeoutConfig;
@@ -27,6 +29,7 @@ use crate::network::{NetworkEvent, NetworkMsg, NetworkRef, Status};
2729
use crate::sync::Msg as SyncMsg;
2830
use crate::sync::SyncRef;
2931
use crate::util::events::{Event, TxEvent};
32+
use crate::util::msg_buffer::MessageBuffer;
3033
use crate::util::streaming::StreamMessage;
3134
use crate::util::timers::{TimeoutElapsed, TimerScheduler};
3235
use crate::wal::{Msg as WalMsg, WalEntry, WalRef};
@@ -80,6 +83,7 @@ where
8083

8184
pub type ConsensusMsg<Ctx> = Msg<Ctx>;
8285

86+
#[derive_where(Debug)]
8387
pub enum Msg<Ctx: Context> {
8488
/// Start consensus for the given height with the given validator set
8589
StartHeight(Ctx::Height, Ctx::ValidatorSet),
@@ -164,6 +168,10 @@ enum Phase {
164168
Recovering,
165169
}
166170

171+
/// Maximum number of messages to buffer while consensus is
172+
/// in the `Unstarted` or `Recovering` phase
173+
const MAX_BUFFER_SIZE: usize = 1024;
174+
167175
pub struct State<Ctx: Context> {
168176
/// Scheduler for timers
169177
timers: Timers,
@@ -179,6 +187,10 @@ pub struct State<Ctx: Context> {
179187

180188
/// The current phase
181189
phase: Phase,
190+
191+
/// A buffer of messages that were received while
192+
/// consensus was `Unstarted` or in the `Recovering` phase
193+
msg_buffer: MessageBuffer<Ctx>,
182194
}
183195

184196
impl<Ctx> State<Ctx>
@@ -251,6 +263,23 @@ where
251263
)
252264
}
253265

266+
#[async_recursion]
267+
async fn process_buffered_msgs(&self, myself: &ActorRef<Msg<Ctx>>, state: &mut State<Ctx>) {
268+
if state.msg_buffer.is_empty() {
269+
return;
270+
}
271+
272+
info!(count = %state.msg_buffer.len(), "Replaying buffered messages");
273+
274+
while let Some(msg) = state.msg_buffer.pop() {
275+
info!("Replaying buffered message: {msg:?}");
276+
277+
if let Err(e) = self.handle_msg(myself.clone(), state, msg).await {
278+
error!("Error when handling buffered message: {e:?}");
279+
}
280+
}
281+
}
282+
254283
async fn handle_msg(
255284
&self,
256285
myself: ActorRef<Msg<Ctx>>,
@@ -259,8 +288,6 @@ where
259288
) -> Result<(), ActorProcessingErr> {
260289
match msg {
261290
Msg::StartHeight(height, validator_set) => {
262-
state.phase = Phase::Running;
263-
264291
let result = self
265292
.process_input(
266293
&myself,
@@ -286,6 +313,10 @@ where
286313
error!(%height, "Error when checking and replaying WAL: {e}");
287314
}
288315

316+
self.process_buffered_msgs(&myself, state).await;
317+
318+
state.phase = Phase::Running;
319+
289320
Ok(())
290321
}
291322

@@ -607,8 +638,6 @@ where
607638
error!(%height, "Failed to replay WAL entries: {e}");
608639
self.tx_event.send(|| Event::WalReplayError(Arc::new(e)));
609640
}
610-
611-
state.phase = Phase::Running;
612641
}
613642
Err(e) => {
614643
error!(%height, "Error when notifying WAL of started height: {e}");
@@ -1103,6 +1132,7 @@ where
11031132
consensus: ConsensusState::new(self.ctx.clone(), self.params.clone()),
11041133
connected_peers: BTreeSet::new(),
11051134
phase: Phase::Unstarted,
1135+
msg_buffer: MessageBuffer::new(MAX_BUFFER_SIZE),
11061136
})
11071137
}
11081138

@@ -1129,7 +1159,13 @@ where
11291159
msg: Msg<Ctx>,
11301160
state: &mut State<Ctx>,
11311161
) -> Result<(), ActorProcessingErr> {
1132-
if let Err(e) = self.handle_msg(myself, state, msg).await {
1162+
if state.phase != Phase::Running && should_buffer(&msg) {
1163+
let _span = error_span!("buffer", phase = ?state.phase).entered();
1164+
state.msg_buffer.buffer(msg);
1165+
return Ok(());
1166+
}
1167+
1168+
if let Err(e) = self.handle_msg(myself.clone(), state, msg).await {
11331169
error!("Error when handling message: {e:?}");
11341170
}
11351171

@@ -1142,9 +1178,18 @@ where
11421178
state: &mut State<Ctx>,
11431179
) -> Result<(), ActorProcessingErr> {
11441180
info!("Stopping...");
1145-
11461181
state.timers.cancel_all();
1147-
11481182
Ok(())
11491183
}
11501184
}
1185+
1186+
fn should_buffer<Ctx: Context>(msg: &Msg<Ctx>) -> bool {
1187+
!matches!(
1188+
msg,
1189+
Msg::StartHeight(..)
1190+
| Msg::GetStatus(..)
1191+
| Msg::NetworkEvent(NetworkEvent::Listening(..))
1192+
| Msg::NetworkEvent(NetworkEvent::PeerConnected(..))
1193+
| Msg::NetworkEvent(NetworkEvent::PeerDisconnected(..))
1194+
)
1195+
}

code/crates/engine/src/util/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod events;
2+
pub mod msg_buffer;
23
pub mod streaming;
34
pub mod ticker;
45
pub mod timers;
+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use std::collections::VecDeque;
2+
3+
use malachitebft_core_types::Context;
4+
use tracing::{info, warn};
5+
6+
use crate::consensus::ConsensusMsg;
7+
8+
pub struct MessageBuffer<Ctx: Context> {
9+
messages: VecDeque<ConsensusMsg<Ctx>>,
10+
max_size: usize,
11+
}
12+
13+
impl<Ctx: Context> MessageBuffer<Ctx> {
14+
pub fn new(max_size: usize) -> Self {
15+
Self {
16+
messages: VecDeque::new(),
17+
max_size,
18+
}
19+
}
20+
21+
pub fn buffer(&mut self, msg: ConsensusMsg<Ctx>) -> bool {
22+
if self.messages.len() < self.max_size {
23+
info!("Buffering message: {msg:?}");
24+
self.messages.push_back(msg);
25+
true
26+
} else {
27+
warn!("Buffer is full, dropping message: {msg:?}");
28+
false
29+
}
30+
}
31+
32+
pub fn pop(&mut self) -> Option<ConsensusMsg<Ctx>> {
33+
self.messages.pop_front()
34+
}
35+
36+
pub fn is_empty(&self) -> bool {
37+
self.messages.is_empty()
38+
}
39+
40+
pub fn len(&self) -> usize {
41+
self.messages.len()
42+
}
43+
}

0 commit comments

Comments
 (0)