Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus): fix amnesia recovery boot run #4768

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/core/src/commit_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl CommitConsumerMonitor {
}
}

pub(crate) fn highest_handled_commit(&self) -> CommitIndex {
pub fn highest_handled_commit(&self) -> CommitIndex {
*self.highest_handled_commit.borrow()
}

Expand Down
44 changes: 35 additions & 9 deletions crates/iota-core/src/consensus_manager/mysticeti_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use consensus_config::{Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
use consensus_core::{CommitConsumer, CommitIndex, ConsensusAuthority};
use consensus_core::{CommitConsumer, CommitConsumerMonitor, CommitIndex, ConsensusAuthority};
use fastcrypto::ed25519;
use iota_config::NodeConfig;
use iota_metrics::{RegistryID, RegistryService, monitored_mpsc::unbounded_channel};
Expand Down Expand Up @@ -49,6 +49,7 @@ pub struct MysticetiManager {
client: Arc<LazyMysticetiClient>,
// TODO: switch to parking_lot::Mutex.
consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
}

impl MysticetiManager {
Expand All @@ -74,6 +75,7 @@ impl MysticetiManager {
client,
consensus_handler: Mutex::new(None),
boot_counter: Mutex::new(0),
consumer_monitor: ArcSwapOption::empty(),
}
}

Expand Down Expand Up @@ -158,9 +160,36 @@ impl ConsensusManagerTrait for MysticetiManager {
);
let monitor = consumer.monitor();

// TODO(mysticeti): Investigate if we need to return potential errors from
// AuthorityNode and add retries here?
let boot_counter = *self.boot_counter.lock().await;
// If there is a previous consumer monitor, it indicates that the consensus
// engine has been restarted, due to an epoch change. However, that on its
// own doesn't tell us much whether it participated on an active epoch or an old
// one. We need to check if it has handled any commits to determine this.
// If indeed any commits did happen, then we assume that node did participate on
// previous run.
let participated_on_previous_run =
if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
previous_monitor.highest_handled_commit() > 0
} else {
false
};

// Increment the boot counter only if the consensus successfully participated in
// the previous run. This is typical during normal epoch changes, where
// the node restarts as expected, and the boot counter is incremented to prevent
// amnesia recovery on the next start. If the node is recovering from a
// restore process and catching up across multiple epochs, it won't handle any
// commits until it reaches the last active epoch. In this scenario, we
// do not increment the boot counter, as we need amnesia recovery to run.
let mut boot_counter = self.boot_counter.lock().await;
if participated_on_previous_run {
*boot_counter += 1;
} else {
info!(
"Node has not participated in previous run. Boot counter will not increment {}",
*boot_counter
);
}

let authority = ConsensusAuthority::start(
network_type,
own_index,
Expand All @@ -172,15 +201,11 @@ impl ConsensusManagerTrait for MysticetiManager {
Arc::new(tx_validator.clone()),
consumer,
registry.clone(),
boot_counter,
*boot_counter,
)
.await;
let client = authority.transaction_client();

// Now increment the boot counter
let mut boot_counter = self.boot_counter.lock().await;
*boot_counter += 1;

let registry_id = self.registry_service.add(registry.clone());

let registered_authority = Arc::new((authority, registry_id));
Expand All @@ -191,6 +216,7 @@ impl ConsensusManagerTrait for MysticetiManager {

// spin up the new mysticeti consensus handler to listen for committed sub dags
let handler = MysticetiConsensusHandler::new(consensus_handler, commit_receiver, monitor);

let mut consensus_handler = self.consensus_handler.lock().await;
*consensus_handler = Some(handler);

Expand Down
21 changes: 18 additions & 3 deletions crates/iota-core/src/unit_tests/mysticeti_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,32 @@ async fn test_mysticeti_manager() {

// THEN
assert!(manager.is_running().await);
let boot_counter = *manager.boot_counter.lock().await;
if i == 1 || i == 2 {
assert_eq!(boot_counter, 0);
} else {
assert_eq!(boot_counter, 1);
}

// Now try to shut it down
sleep(Duration::from_secs(1)).await;

// Simulate a commit by bumping the handled commit index so we can ensure that
// boot counter increments only after the first run. Practically we want
// to simulate a case where consensus engine restarts when no commits have
// happened before for first run.
if i > 1 {
let monitor = manager
.consumer_monitor
.load_full()
.expect("A consumer monitor should have been initialised");
monitor.set_highest_handled_commit(100);
}

// WHEN
manager.shutdown().await;

// THEN
assert!(!manager.is_running().await);

let boot_counter = *manager.boot_counter.lock().await;
assert_eq!(boot_counter, i);
}
}
Loading