Skip to content

Commit

Permalink
fix(iota-indexer): Refactor the network metrics processor to efficien…
Browse files Browse the repository at this point in the history
…tly handle data migrations (#5609)

* fix: Refactor the network metrics processor to efficiently handle data migrations

* fix: Make MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE configurable

* refactor: Rename variables

* refactor: Use own variable instead of constant
  • Loading branch information
samuel-rufi authored Feb 26, 2025
1 parent 6bb47b1 commit ea4e547
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 46 deletions.
109 changes: 65 additions & 44 deletions crates/iota-indexer/src/processors/network_metrics_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,50 @@ use crate::{
types::IndexerResult,
};

const NETWORK_METRICS_PROCESSOR_BATCH_SIZE: usize = 10;
const PARALLELISM: usize = 1;
const MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE: usize = 10;
const MAX_NETWORK_METRICS_PROCESSOR_BATCH_SIZE: usize = 80000;
const NETWORK_METRICS_PROCESSOR_PARALLELISM: usize = 1;

pub struct NetworkMetricsProcessor<S> {
pub store: S,
metrics: IndexerMetrics,
pub network_processor_metrics_batch_size: usize,
pub network_processor_metrics_parallelism: usize,
pub min_network_metrics_processor_batch_size: usize,
pub max_network_metrics_processor_batch_size: usize,
pub network_metrics_processor_parallelism: usize,
}

impl<S> NetworkMetricsProcessor<S>
where
S: IndexerAnalyticalStore + Clone + Sync + Send + 'static,
{
pub fn new(store: S, metrics: IndexerMetrics) -> NetworkMetricsProcessor<S> {
let network_processor_metrics_batch_size =
std::env::var("NETWORK_PROCESSOR_METRICS_BATCH_SIZE")
let min_network_metrics_processor_batch_size =
std::env::var("MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE")
.map(|s| {
s.parse::<usize>()
.unwrap_or(NETWORK_METRICS_PROCESSOR_BATCH_SIZE)
.unwrap_or(MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE)
})
.unwrap_or(NETWORK_METRICS_PROCESSOR_BATCH_SIZE);
let network_processor_metrics_parallelism =
std::env::var("NETWORK_PROCESSOR_METRICS_PARALLELISM")
.map(|s| s.parse::<usize>().unwrap_or(PARALLELISM))
.unwrap_or(PARALLELISM);
.unwrap_or(MIN_NETWORK_METRICS_PROCESSOR_BATCH_SIZE);
let max_network_metrics_processor_batch_size =
std::env::var("MAX_NETWORK_METRICS_PROCESSOR_BATCH_SIZE")
.map(|s| {
s.parse::<usize>()
.unwrap_or(MAX_NETWORK_METRICS_PROCESSOR_BATCH_SIZE)
})
.unwrap_or(MAX_NETWORK_METRICS_PROCESSOR_BATCH_SIZE);
let network_metrics_processor_parallelism =
std::env::var("NETWORK_METRICS_PROCESSOR_PARALLELISM")
.map(|s| {
s.parse::<usize>()
.unwrap_or(NETWORK_METRICS_PROCESSOR_PARALLELISM)
})
.unwrap_or(NETWORK_METRICS_PROCESSOR_PARALLELISM);
Self {
store,
metrics,
network_processor_metrics_batch_size,
network_processor_metrics_parallelism,
min_network_metrics_processor_batch_size,
max_network_metrics_processor_batch_size,
network_metrics_processor_parallelism,
}
}

Expand All @@ -60,52 +73,60 @@ where
.unwrap_or_default()
.checkpoint_sequence_number;
let mut last_processed_peak_tps_epoch = latest_epoch_peak_tps.unwrap_or_default().epoch;

loop {
let mut latest_stored_checkpoint = self.store.get_latest_stored_checkpoint().await?;
while if let Some(cp) = latest_stored_checkpoint {
cp.sequence_number
< last_processed_cp_seq + self.network_processor_metrics_batch_size as i64
} else {
true
} {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
latest_stored_checkpoint = self.store.get_latest_stored_checkpoint().await?;
}
let latest_stored_checkpoint = loop {
if let Some(latest_stored_checkpoint) =
self.store.get_latest_stored_checkpoint().await?
{
if latest_stored_checkpoint.sequence_number
>= last_processed_cp_seq
+ self.min_network_metrics_processor_batch_size as i64
{
break latest_stored_checkpoint;
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
};

let available_checkpoints =
latest_stored_checkpoint.sequence_number - last_processed_cp_seq;
let batch_size =
available_checkpoints.min(self.max_network_metrics_processor_batch_size as i64);

info!(
"Persisting tx count metrics for checkpoint sequence number {}",
last_processed_cp_seq
"Preparing tx count metrics for checkpoints [{}-{}]",
last_processed_cp_seq + 1,
last_processed_cp_seq + batch_size
);
let batch_size = self.network_processor_metrics_batch_size;
let step_size = batch_size / self.network_processor_metrics_parallelism;

let step_size =
(batch_size as usize / self.network_metrics_processor_parallelism).max(1);
let mut persist_tasks = vec![];
for chunk_start_cp in (last_processed_cp_seq + 1
..last_processed_cp_seq + batch_size as i64 + 1)
.step_by(step_size)

for chunk_start_cp in
(last_processed_cp_seq + 1..=last_processed_cp_seq + batch_size).step_by(step_size)
{
let chunk_end_cp =
(chunk_start_cp + step_size as i64).min(last_processed_cp_seq + batch_size + 1);

let store = self.store.clone();
persist_tasks.push(tokio::task::spawn_blocking(move || {
store
.persist_tx_count_metrics(chunk_start_cp, chunk_start_cp + step_size as i64)
store.persist_tx_count_metrics(chunk_start_cp, chunk_end_cp)
}));
}

futures::future::join_all(persist_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| {
error!("Error joining network persist tasks: {:?}", e);
})?
.tap_err(|e| error!("Error joining network persist tasks: {:?}", e))?
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| {
error!("Error persisting tx count metrics: {:?}", e);
})?;
last_processed_cp_seq += batch_size as i64;
info!(
"Persisted tx count metrics for checkpoint sequence number {}",
last_processed_cp_seq
);
.tap_err(|e| error!("Error persisting tx count metrics: {:?}", e))?;

last_processed_cp_seq += batch_size;

self.metrics
.latest_network_metrics_cp_seq
.set(last_processed_cp_seq);
Expand Down
14 changes: 12 additions & 2 deletions crates/iota-indexer/src/store/pg_indexer_analytical_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,19 @@ impl IndexerAnalyticalStore for PgIndexerAnalyticalStore {
Ok(latest_network_metrics)
}

/// Persists the transaction count metrics for the given checkpoint range.
/// Start checkpoint is inclusive, end checkpoint is exclusive.
fn persist_tx_count_metrics(
&self,
start_checkpoint: i64,
end_checkpoint: i64,
) -> IndexerResult<()> {
let tx_count_query = construct_checkpoint_tx_count_query(start_checkpoint, end_checkpoint);
info!("Persisting tx count metrics for cp {}", start_checkpoint);
info!(
"Persisting tx count metrics for checkpoints [{}-{}]",
start_checkpoint,
end_checkpoint - 1
);
transactional_blocking_with_retry!(
&self.blocking_cp,
|conn| {
Expand All @@ -212,7 +218,11 @@ impl IndexerAnalyticalStore for PgIndexerAnalyticalStore {
Duration::from_secs(10)
)
.context("Failed persisting tx count metrics to PostgresDB")?;
info!("Persisted tx count metrics for cp {}", start_checkpoint);
info!(
"Persisted tx count metrics for checkpoints [{}-{}]",
start_checkpoint,
end_checkpoint - 1
);
Ok(())
}

Expand Down

0 comments on commit ea4e547

Please sign in to comment.