diff --git a/consensus/core/src/commit_consumer.rs b/consensus/core/src/commit_consumer.rs index 6147f7d88f8..6c9846b80a8 100644 --- a/consensus/core/src/commit_consumer.rs +++ b/consensus/core/src/commit_consumer.rs @@ -55,7 +55,7 @@ impl CommitConsumerMonitor { } } - pub(crate) fn highest_handled_commit(&self) -> CommitIndex { + pub fn highest_handled_commit(&self) -> CommitIndex { *self.highest_handled_commit.borrow() } diff --git a/crates/iota-core/src/consensus_manager/mysticeti_manager.rs b/crates/iota-core/src/consensus_manager/mysticeti_manager.rs index b2f82e4f6d9..55c1a3b86a9 100644 --- a/crates/iota-core/src/consensus_manager/mysticeti_manager.rs +++ b/crates/iota-core/src/consensus_manager/mysticeti_manager.rs @@ -7,7 +7,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration}; use arc_swap::ArcSwapOption; use async_trait::async_trait; use consensus_config::{Committee, NetworkKeyPair, Parameters, ProtocolKeyPair}; -use consensus_core::{CommitConsumer, CommitIndex, ConsensusAuthority}; +use consensus_core::{CommitConsumer, CommitConsumerMonitor, CommitIndex, ConsensusAuthority}; use fastcrypto::ed25519; use iota_config::NodeConfig; use iota_metrics::{RegistryID, RegistryService, monitored_mpsc::unbounded_channel}; @@ -49,6 +49,7 @@ pub struct MysticetiManager { client: Arc, // TODO: switch to parking_lot::Mutex. consensus_handler: Mutex>, + consumer_monitor: ArcSwapOption, } impl MysticetiManager { @@ -74,6 +75,7 @@ impl MysticetiManager { client, consensus_handler: Mutex::new(None), boot_counter: Mutex::new(0), + consumer_monitor: ArcSwapOption::empty(), } } @@ -158,9 +160,36 @@ impl ConsensusManagerTrait for MysticetiManager { ); let monitor = consumer.monitor(); - // TODO(mysticeti): Investigate if we need to return potential errors from - // AuthorityNode and add retries here? - let boot_counter = *self.boot_counter.lock().await; + // If there is a previous consumer monitor, it indicates that the consensus + // engine has been restarted, due to an epoch change. However, that on its + // own doesn't tell us much whether it participated on an active epoch or an old + // one. We need to check if it has handled any commits to determine this. + // If indeed any commits did happen, then we assume that node did participate on + // previous run. + let participated_on_previous_run = + if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) { + previous_monitor.highest_handled_commit() > 0 + } else { + false + }; + + // Increment the boot counter only if the consensus successfully participated in + // the previous run. This is typical during normal epoch changes, where + // the node restarts as expected, and the boot counter is incremented to prevent + // amnesia recovery on the next start. If the node is recovering from a + // restore process and catching up across multiple epochs, it won't handle any + // commits until it reaches the last active epoch. In this scenario, we + // do not increment the boot counter, as we need amnesia recovery to run. + let mut boot_counter = self.boot_counter.lock().await; + if participated_on_previous_run { + *boot_counter += 1; + } else { + info!( + "Node has not participated in previous run. Boot counter will not increment {}", + *boot_counter + ); + } + let authority = ConsensusAuthority::start( network_type, own_index, @@ -172,15 +201,11 @@ impl ConsensusManagerTrait for MysticetiManager { Arc::new(tx_validator.clone()), consumer, registry.clone(), - boot_counter, + *boot_counter, ) .await; let client = authority.transaction_client(); - // Now increment the boot counter - let mut boot_counter = self.boot_counter.lock().await; - *boot_counter += 1; - let registry_id = self.registry_service.add(registry.clone()); let registered_authority = Arc::new((authority, registry_id)); @@ -191,6 +216,7 @@ impl ConsensusManagerTrait for MysticetiManager { // spin up the new mysticeti consensus handler to listen for committed sub dags let handler = MysticetiConsensusHandler::new(consensus_handler, commit_receiver, monitor); + let mut consensus_handler = self.consensus_handler.lock().await; *consensus_handler = Some(handler); diff --git a/crates/iota-core/src/unit_tests/mysticeti_manager_tests.rs b/crates/iota-core/src/unit_tests/mysticeti_manager_tests.rs index 63b9d5fa6db..82346091e9f 100644 --- a/crates/iota-core/src/unit_tests/mysticeti_manager_tests.rs +++ b/crates/iota-core/src/unit_tests/mysticeti_manager_tests.rs @@ -106,17 +106,32 @@ async fn test_mysticeti_manager() { // THEN assert!(manager.is_running().await); + let boot_counter = *manager.boot_counter.lock().await; + if i == 1 || i == 2 { + assert_eq!(boot_counter, 0); + } else { + assert_eq!(boot_counter, 1); + } // Now try to shut it down sleep(Duration::from_secs(1)).await; + // Simulate a commit by bumping the handled commit index so we can ensure that + // boot counter increments only after the first run. Practically we want + // to simulate a case where consensus engine restarts when no commits have + // happened before for first run. + if i > 1 { + let monitor = manager + .consumer_monitor + .load_full() + .expect("A consumer monitor should have been initialised"); + monitor.set_highest_handled_commit(100); + } + // WHEN manager.shutdown().await; // THEN assert!(!manager.is_running().await); - - let boot_counter = *manager.boot_counter.lock().await; - assert_eq!(boot_counter, i); } }