Skip to content

Commit

Permalink
fix: terminate event observer on failure
Browse files Browse the repository at this point in the history
  • Loading branch information
MicaiahReid committed Mar 13, 2024
1 parent 51479c5 commit b3e938a
Showing 1 changed file with 69 additions and 37 deletions.
106 changes: 69 additions & 37 deletions components/chainhook-sdk/src/observer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,34 +436,64 @@ pub fn start_event_observer(
let context_cloned = ctx.clone();
let event_observer_config_moved = config.clone();
let observer_commands_tx_moved = observer_commands_tx.clone();
let _ = hiro_system_kit::thread_named("Chainhook event observer").spawn(move || {
let future = start_bitcoin_event_observer(
event_observer_config_moved,
observer_commands_tx_moved,
observer_commands_rx,
observer_events_tx,
observer_sidecar,
context_cloned,
);
let _ = hiro_system_kit::nestable_block_on(future);
});
let _ = hiro_system_kit::thread_named("Chainhook event observer")
.spawn(move || {
let future = start_bitcoin_event_observer(
event_observer_config_moved,
observer_commands_tx_moved,
observer_commands_rx,
observer_events_tx.clone(),
observer_sidecar,
context_cloned.clone(),
);
match hiro_system_kit::nestable_block_on(future) {
Ok(_) => {}
Err(e) => {
if let Some(tx) = observer_events_tx {
context_cloned.try_log(|logger| {
slog::crit!(
logger,
"Chainhook event observer thread failed with error: {e}",
)
});
let _ = tx.send(ObserverEvent::Terminate);
}
}
}
})
.expect("unable to spawn thread");
}
BitcoinBlockSignaling::Stacks(ref _url) => {
// Start chainhook event observer
let context_cloned = ctx.clone();
let event_observer_config_moved = config.clone();
let observer_commands_tx_moved = observer_commands_tx.clone();
let _ = hiro_system_kit::thread_named("Chainhook event observer").spawn(move || {
let future = start_stacks_event_observer(
event_observer_config_moved,
observer_commands_tx_moved,
observer_commands_rx,
observer_events_tx,
observer_sidecar,
context_cloned,
);
let _ = hiro_system_kit::nestable_block_on(future);
});
let _ = hiro_system_kit::thread_named("Chainhook event observer")
.spawn(move || {
let future = start_stacks_event_observer(
event_observer_config_moved,
observer_commands_tx_moved,
observer_commands_rx,
observer_events_tx.clone(),
observer_sidecar,
context_cloned.clone(),
);
match hiro_system_kit::nestable_block_on(future) {
Ok(_) => {}
Err(e) => {
if let Some(tx) = observer_events_tx {
context_cloned.try_log(|logger| {
slog::crit!(
logger,
"Chainhook event observer thread failed with error: {e}",
)
});
let _ = tx.send(ObserverEvent::Terminate);
}
}
}
})
.expect("unable to spawn thread");

ctx.try_log(|logger| {
slog::info!(
Expand Down Expand Up @@ -752,15 +782,14 @@ pub async fn start_observer_commands_handler(
let command = match observer_commands_rx.recv() {
Ok(cmd) => cmd,
Err(e) => {
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::Error(format!("Channel error: {:?}", e)));
}
continue;
ctx.try_log(|logger| {
slog::crit!(logger, "Error: broken channel {}", e.to_string())
});
break;
}
};
match command {
ObserverCommand::Terminate => {
terminate(ingestion_shutdown, observer_events_tx, &ctx);
break;
}
ObserverCommand::ProcessBitcoinBlock(mut block_data) => {
Expand All @@ -773,18 +802,11 @@ pub async fn start_observer_commands_handler(
&config.bitcoin_network,
&ctx,
) {
Ok(block) => break block,
Ok(block) => break Some(block),
Err((e, refetch_block)) => {
attempts += 1;
if attempts > max_attempts {
terminate(ingestion_shutdown, observer_events_tx, &ctx);
let err_msg = format!(
"Could not process bitcoin block after {} attempts.",
attempts
);

ctx.try_log(|logger| slog::crit!(logger, "{}", err_msg));
panic!("{}", err_msg);
break None;
}
ctx.try_log(|logger| {
slog::error!(logger, "Error standardizing block: {}", e)
Expand Down Expand Up @@ -814,7 +836,16 @@ pub async fn start_observer_commands_handler(
}
};
};

let Some(block) = block else {
ctx.try_log(|logger| {
slog::crit!(
logger,
"Could not process bitcoin block after {} attempts.",
attempts
)
});
break;
};
prometheus_monitoring.btc_metrics_ingest_block(block.block_identifier.index);

bitcoin_block_store.insert(
Expand Down Expand Up @@ -1420,6 +1451,7 @@ pub async fn start_observer_commands_handler(
}
}
}
terminate(ingestion_shutdown, observer_events_tx, &ctx);
Ok(())
}

Expand Down

0 comments on commit b3e938a

Please sign in to comment.