Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(node): Wait for checkpoint service to stop during reconfig #5391

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions crates/iota-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use futures::{
future::{Either, select},
};
use iota_macros::fail_point;
use iota_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
use iota_network::default_iota_network_config;
use iota_protocol_config::ProtocolVersion;
use iota_types::{
Expand Down Expand Up @@ -58,6 +58,7 @@ use rand::{rngs::OsRng, seq::SliceRandom};
use serde::{Deserialize, Serialize};
use tokio::{
sync::{Notify, watch},
task::JoinSet,
time::timeout,
};
use tracing::{debug, error, info, instrument, warn};
Expand Down Expand Up @@ -2239,7 +2240,11 @@ impl CheckpointService {
metrics: Arc<CheckpointMetrics>,
max_transactions_per_checkpoint: usize,
max_checkpoint_size_bytes: usize,
) -> (Arc<Self>, watch::Sender<()> /* The exit sender */) {
) -> (
Arc<Self>,
watch::Sender<()>, // The exit sender
JoinSet<()>, // Handle to tasks
) {
info!(
"Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
);
Expand All @@ -2248,6 +2253,8 @@ impl CheckpointService {

let (exit_snd, exit_rcv) = watch::channel(());

let mut tasks = JoinSet::new();

let builder = CheckpointBuilder::new(
state.clone(),
checkpoint_store.clone(),
Expand All @@ -2262,9 +2269,10 @@ impl CheckpointService {
max_transactions_per_checkpoint,
max_checkpoint_size_bytes,
);

let epoch_store_clone = epoch_store.clone();
spawn_monitored_task!(epoch_store_clone.within_alive_epoch(builder.run()));
tasks.spawn(monitored_future!(async move {
let _ = epoch_store_clone.within_alive_epoch(builder.run()).await;
}));

let aggregator = CheckpointAggregator::new(
checkpoint_store.clone(),
Expand All @@ -2275,8 +2283,7 @@ impl CheckpointService {
state.clone(),
metrics.clone(),
);

spawn_monitored_task!(aggregator.run());
tasks.spawn(monitored_future!(aggregator.run()));

let last_signature_index = epoch_store
.get_last_checkpoint_signature_index()
Expand All @@ -2290,7 +2297,8 @@ impl CheckpointService {
last_signature_index,
metrics,
});
(service, exit_snd)

(service, exit_snd, tasks)
}

#[cfg(test)]
Expand Down Expand Up @@ -2522,7 +2530,7 @@ mod tests {
state.get_accumulator_store().clone(),
));

let (checkpoint_service, _exit) = CheckpointService::spawn(
let (checkpoint_service, _exit_sender, _tasks) = CheckpointService::spawn(
state.clone(),
checkpoint_store,
epoch_store.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn checkpoint_service_for_testing(state: Arc<AuthorityState>) -> Arc<Checkpo
));
let (certified_output, _certified_result) = mpsc::channel::<CertifiedCheckpointSummary>(10);

let (checkpoint_service, _) = CheckpointService::spawn(
let (checkpoint_service, _, _) = CheckpointService::spawn(
state.clone(),
state.get_checkpoint_store().clone(),
epoch_store.clone(),
Expand Down
59 changes: 42 additions & 17 deletions crates/iota-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
use futures::TryFutureExt;
pub use handle::IotaNodeHandle;
use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
use iota_common::debug_fatal;
use iota_config::{
ConsensusConfig, NodeConfig,
node::{DBCheckpointConfig, RunWithRange},
Expand Down Expand Up @@ -123,7 +124,8 @@ use tap::tap::TapFallible;
use tokio::{
runtime::Handle,
sync::{Mutex, broadcast, mpsc, watch},
task::JoinHandle,
task::{JoinHandle, JoinSet},
time::timeout,
};
use tower::ServiceBuilder;
use tracing::{Instrument, debug, error, error_span, info, warn};
Expand All @@ -141,10 +143,12 @@ pub struct ValidatorComponents {
consensus_manager: ConsensusManager,
consensus_store_pruner: ConsensusStorePruner,
consensus_adapter: Arc<ConsensusAdapter>,
// dropping this will eventually stop checkpoint tasks. The receiver side of this channel
// is copied into each checkpoint service task, and they are listening to any change to this
// channel. When the sender is dropped, a change is triggered and those tasks will exit.
// Sending to the channel or dropping this will eventually stop checkpoint tasks.
// The receiver side of this channel is copied into each checkpoint service task,
// and they are listening to any change to this channel.
checkpoint_service_exit: watch::Sender<()>,
// Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
checkpoint_service_tasks: JoinSet<()>,
checkpoint_metrics: Arc<CheckpointMetrics>,
iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
}
Expand Down Expand Up @@ -1273,16 +1277,17 @@ impl IotaNode {
iota_node_metrics: Arc<IotaNodeMetrics>,
iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
) -> Result<ValidatorComponents> {
let (checkpoint_service, checkpoint_service_exit) = Self::start_checkpoint_service(
config,
consensus_adapter.clone(),
checkpoint_store,
epoch_store.clone(),
state.clone(),
state_sync_handle,
accumulator,
checkpoint_metrics.clone(),
);
let (checkpoint_service, checkpoint_service_exit, checkpoint_service_tasks) =
Self::start_checkpoint_service(
config,
consensus_adapter.clone(),
checkpoint_store,
epoch_store.clone(),
state.clone(),
state_sync_handle,
accumulator,
checkpoint_metrics.clone(),
);

// create a new map that gets injected into both the consensus handler and the
// consensus adapter the consensus handler will write values forwarded
Expand Down Expand Up @@ -1343,6 +1348,7 @@ impl IotaNode {
consensus_store_pruner,
consensus_adapter,
checkpoint_service_exit,
checkpoint_service_tasks,
checkpoint_metrics,
iota_tx_validator_metrics,
})
Expand All @@ -1363,7 +1369,7 @@ impl IotaNode {
state_sync_handle: state_sync::Handle,
accumulator: Weak<StateAccumulator>,
checkpoint_metrics: Arc<CheckpointMetrics>,
) -> (Arc<CheckpointService>, watch::Sender<()>) {
) -> (Arc<CheckpointService>, watch::Sender<()>, JoinSet<()>) {
let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();

Expand Down Expand Up @@ -1646,15 +1652,33 @@ impl IotaNode {
consensus_store_pruner,
consensus_adapter,
checkpoint_service_exit,
mut checkpoint_service_tasks,
checkpoint_metrics,
iota_tx_validator_metrics,
}) = self.validator_components.lock().await.take()
{
info!("Reconfiguring the validator.");
// Stop the old checkpoint service.
drop(checkpoint_service_exit);
// Stop the old checkpoint service and wait for them to finish.
let _ = checkpoint_service_exit.send(());
let wait_result = timeout(Duration::from_secs(5), async move {
while let Some(result) = checkpoint_service_tasks.join_next().await {
if let Err(err) = result {
if err.is_panic() {
std::panic::resume_unwind(err.into_panic());
}
warn!("Error in checkpoint service task: {:?}", err);
}
}
})
.await;
if wait_result.is_err() {
debug_fatal!("Timed out waiting for checkpoint service tasks to finish.");
} else {
info!("Checkpoint service has shut down.");
}

consensus_manager.shutdown().await;
info!("Consensus has shut down.");

let new_epoch_store = self
.reconfigure_state(
Expand All @@ -1665,6 +1689,7 @@ impl IotaNode {
accumulator.clone(),
)
.await?;
info!("Epoch store finished reconfiguration.");

// No other components should be holding a strong reference to state accumulator
// at this point. Confirm here before we swap in the new accumulator.
Expand Down