From b3e938a6aead30376ae8b53e0dc82edd62d134ea Mon Sep 17 00:00:00 2001 From: MicaiahReid Date: Wed, 13 Mar 2024 15:05:59 -0400 Subject: [PATCH] fix: terminate event observer on failure --- components/chainhook-sdk/src/observer/mod.rs | 106 ++++++++++++------- 1 file changed, 69 insertions(+), 37 deletions(-) diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index 9b12534e..a429890a 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -436,34 +436,64 @@ pub fn start_event_observer( let context_cloned = ctx.clone(); let event_observer_config_moved = config.clone(); let observer_commands_tx_moved = observer_commands_tx.clone(); - let _ = hiro_system_kit::thread_named("Chainhook event observer").spawn(move || { - let future = start_bitcoin_event_observer( - event_observer_config_moved, - observer_commands_tx_moved, - observer_commands_rx, - observer_events_tx, - observer_sidecar, - context_cloned, - ); - let _ = hiro_system_kit::nestable_block_on(future); - }); + let _ = hiro_system_kit::thread_named("Chainhook event observer") + .spawn(move || { + let future = start_bitcoin_event_observer( + event_observer_config_moved, + observer_commands_tx_moved, + observer_commands_rx, + observer_events_tx.clone(), + observer_sidecar, + context_cloned.clone(), + ); + match hiro_system_kit::nestable_block_on(future) { + Ok(_) => {} + Err(e) => { + if let Some(tx) = observer_events_tx { + context_cloned.try_log(|logger| { + slog::crit!( + logger, + "Chainhook event observer thread failed with error: {e}", + ) + }); + let _ = tx.send(ObserverEvent::Terminate); + } + } + } + }) + .expect("unable to spawn thread"); } BitcoinBlockSignaling::Stacks(ref _url) => { // Start chainhook event observer let context_cloned = ctx.clone(); let event_observer_config_moved = config.clone(); let observer_commands_tx_moved = observer_commands_tx.clone(); - let _ = hiro_system_kit::thread_named("Chainhook event observer").spawn(move || { - let future = start_stacks_event_observer( - event_observer_config_moved, - observer_commands_tx_moved, - observer_commands_rx, - observer_events_tx, - observer_sidecar, - context_cloned, - ); - let _ = hiro_system_kit::nestable_block_on(future); - }); + let _ = hiro_system_kit::thread_named("Chainhook event observer") + .spawn(move || { + let future = start_stacks_event_observer( + event_observer_config_moved, + observer_commands_tx_moved, + observer_commands_rx, + observer_events_tx.clone(), + observer_sidecar, + context_cloned.clone(), + ); + match hiro_system_kit::nestable_block_on(future) { + Ok(_) => {} + Err(e) => { + if let Some(tx) = observer_events_tx { + context_cloned.try_log(|logger| { + slog::crit!( + logger, + "Chainhook event observer thread failed with error: {e}", + ) + }); + let _ = tx.send(ObserverEvent::Terminate); + } + } + } + }) + .expect("unable to spawn thread"); ctx.try_log(|logger| { slog::info!( @@ -752,15 +782,14 @@ pub async fn start_observer_commands_handler( let command = match observer_commands_rx.recv() { Ok(cmd) => cmd, Err(e) => { - if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::Error(format!("Channel error: {:?}", e))); - } - continue; + ctx.try_log(|logger| { + slog::crit!(logger, "Error: broken channel {}", e.to_string()) + }); + break; } }; match command { ObserverCommand::Terminate => { - terminate(ingestion_shutdown, observer_events_tx, &ctx); break; } ObserverCommand::ProcessBitcoinBlock(mut block_data) => { @@ -773,18 +802,11 @@ pub async fn start_observer_commands_handler( &config.bitcoin_network, &ctx, ) { - Ok(block) => break block, + Ok(block) => break Some(block), Err((e, refetch_block)) => { attempts += 1; if attempts > max_attempts { - terminate(ingestion_shutdown, observer_events_tx, &ctx); - let err_msg = format!( - "Could not process bitcoin block after {} attempts.", - attempts - ); - - ctx.try_log(|logger| slog::crit!(logger, "{}", err_msg)); - panic!("{}", err_msg); + break None; } ctx.try_log(|logger| { slog::error!(logger, "Error standardizing block: {}", e) @@ -814,7 +836,16 @@ pub async fn start_observer_commands_handler( } }; }; - + let Some(block) = block else { + ctx.try_log(|logger| { + slog::crit!( + logger, + "Could not process bitcoin block after {} attempts.", + attempts + ) + }); + break; + }; prometheus_monitoring.btc_metrics_ingest_block(block.block_identifier.index); bitcoin_block_store.insert( @@ -1420,6 +1451,7 @@ pub async fn start_observer_commands_handler( } } } + terminate(ingestion_shutdown, observer_events_tx, &ctx); Ok(()) }