From d6b8816458ff5518537c971292417fe85d4fa5af Mon Sep 17 00:00:00 2001 From: Micaiah Reid Date: Wed, 27 Mar 2024 11:26:57 -0400 Subject: [PATCH] fix: improve error handling, and more! (#524) This PR introduces a few fixes in an effort to improve reliability and debugging problems when running Chainhook as a service: - Revisits log levels throughout the tool (fixes #498, fixes #521). The general approach for the logs were: - `crit` - fatal errors that will crash mission critical component of Chainhook. In these cases, Chainhook should automatically kill all main threads (not individual scanning threads, which is tracked by #404) to crash the service. - `erro` - something went wrong the could lead to a critical error, or that could impact all users - `warn` - something went wrong that could impact an end user (usually due to user error) - `info` - control flow logging and updates on the state of _all_ registered predicates - `debug` - updates on the state of _a_ predicate - Crash the service if a mission critical thread fails (see https://github.com/hirosystems/chainhook/issues/517#issuecomment-1992135101 for a list of these threads). Previously, if one of these threads failed, the remaining services would keep running. For example, if the event observer handler crashed, the event observer API would keep running. This means that the stacks node is successfully emitting blocks that Chainhook is acknowledging but not ingesting. This causes gaps in our database Fixes #517 - Removes an infinite loop with bitcoin ingestion, crashing the service instead: Fixes #506 - Fixes how we delete predicates from our db when one is deregistered. This should reduce the number of logs we have on startup. Fixes #510 - Warns on all reorgs. Fixes #519 --- components/chainhook-cli/src/archive/mod.rs | 44 ++-- .../chainhook-cli/src/archive/tests/mod.rs | 10 +- components/chainhook-cli/src/cli/mod.rs | 4 +- components/chainhook-cli/src/config/mod.rs | 22 +- .../chainhook-cli/src/config/tests/mod.rs | 28 ++- components/chainhook-cli/src/scan/bitcoin.rs | 13 +- components/chainhook-cli/src/scan/stacks.rs | 37 +-- .../chainhook-cli/src/service/http_api.rs | 15 +- components/chainhook-cli/src/service/mod.rs | 175 +++++++++---- .../chainhook-cli/src/service/runloops.rs | 10 +- components/chainhook-cli/src/storage/mod.rs | 36 ++- components/chainhook-sdk/Cargo.toml | 2 +- .../src/indexer/fork_scratch_pad.rs | 43 +++- components/chainhook-sdk/src/indexer/mod.rs | 2 +- .../chainhook-sdk/src/indexer/stacks/mod.rs | 6 +- components/chainhook-sdk/src/observer/http.rs | 6 +- components/chainhook-sdk/src/observer/mod.rs | 238 +++++++++++------- .../chainhook-sdk/src/observer/tests/mod.rs | 14 +- components/chainhook-sdk/src/utils/mod.rs | 2 +- 19 files changed, 461 insertions(+), 246 deletions(-) diff --git a/components/chainhook-cli/src/archive/mod.rs b/components/chainhook-cli/src/archive/mod.rs index 81eee7911..b41c62245 100644 --- a/components/chainhook-cli/src/archive/mod.rs +++ b/components/chainhook-cli/src/archive/mod.rs @@ -21,7 +21,7 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> { println!("{}", e.to_string()); }); - let remote_sha_url = config.expected_remote_stacks_tsv_sha256(); + let remote_sha_url = config.expected_remote_stacks_tsv_sha256()?; let res = reqwest::get(&remote_sha_url) .await .or(Err(format!("Failed to GET from '{}'", &remote_sha_url)))? @@ -34,7 +34,7 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> { write_file_content_at_path(&local_sha_file_path, &res.to_vec())?; - let file_url = config.expected_remote_stacks_tsv_url(); + let file_url = config.expected_remote_stacks_tsv_url()?; let res = reqwest::get(&file_url) .await .or(Err(format!("Failed to GET from '{}'", &file_url)))?; @@ -55,14 +55,17 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> { Ok(0) => break, Ok(n) => { if let Err(e) = file.write_all(&buffer[..n]) { - let err = - format!("unable to update compressed archive: {}", e.to_string()); - return Err(err); + return Err(format!( + "unable to update compressed archive: {}", + e.to_string() + )); } } Err(e) => { - let err = format!("unable to write compressed archive: {}", e.to_string()); - return Err(err); + return Err(format!( + "unable to write compressed archive: {}", + e.to_string() + )); } } } @@ -83,12 +86,11 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> { .map_err(|e| format!("unable to download stacks archive: {}", e.to_string()))?; } drop(tx); - tokio::task::spawn_blocking(|| decoder_thread.join()) .await - .unwrap() - .unwrap() - .unwrap(); + .map_err(|e| format!("failed to spawn thread: {e}"))? + .map_err(|e| format!("decoder thread failed when downloading tsv: {:?}", e))? + .map_err(|e| format!("failed to download tsv: {}", e))?; } Ok(()) @@ -124,11 +126,14 @@ impl Read for ChannelRead { } } -pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Context) -> bool { +pub async fn download_stacks_dataset_if_required( + config: &mut Config, + ctx: &Context, +) -> Result { if config.is_initial_ingestion_required() { // Download default tsv. if config.rely_on_remote_stacks_tsv() && config.should_download_remote_stacks_tsv() { - let url = config.expected_remote_stacks_tsv_url(); + let url = config.expected_remote_stacks_tsv_url()?; let mut tsv_file_path = config.expected_cache_path(); tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network)); let mut tsv_sha_file_path = config.expected_cache_path(); @@ -137,7 +142,7 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont // Download archive if not already present in cache // Load the local let local_sha_file = read_file_content_at_path(&tsv_sha_file_path); - let sha_url = config.expected_remote_stacks_tsv_sha256(); + let sha_url = config.expected_remote_stacks_tsv_sha256()?; let remote_sha_file = match reqwest::get(&sha_url).await { Ok(response) => response.bytes().await, @@ -164,28 +169,25 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont "Stacks archive file already up to date" ); config.add_local_stacks_tsv_source(&tsv_file_path); - return false; + return Ok(false); } info!(ctx.expect_logger(), "Downloading {}", url); match download_tsv_file(&config).await { Ok(_) => {} - Err(e) => { - error!(ctx.expect_logger(), "{}", e); - std::process::exit(1); - } + Err(e) => return Err(e), } info!(ctx.expect_logger(), "Successfully downloaded tsv file"); config.add_local_stacks_tsv_source(&tsv_file_path); } - true + Ok(true) } else { info!( ctx.expect_logger(), "Streaming blocks from stacks-node {}", config.network.get_stacks_node_config().rpc_url ); - false + Ok(false) } } diff --git a/components/chainhook-cli/src/archive/tests/mod.rs b/components/chainhook-cli/src/archive/tests/mod.rs index eb97bfff9..35250e30b 100644 --- a/components/chainhook-cli/src/archive/tests/mod.rs +++ b/components/chainhook-cli/src/archive/tests/mod.rs @@ -72,8 +72,14 @@ async fn it_downloads_stacks_dataset_if_required() { tracer: false, }; let mut config_clone = config.clone(); - assert!(download_stacks_dataset_if_required(&mut config, &ctx).await); - assert!(!download_stacks_dataset_if_required(&mut config_clone, &ctx).await); + assert!(download_stacks_dataset_if_required(&mut config, &ctx) + .await + .unwrap()); + assert!( + !download_stacks_dataset_if_required(&mut config_clone, &ctx) + .await + .unwrap() + ); let mut tsv_file_path = config.expected_cache_path(); tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network)); diff --git a/components/chainhook-cli/src/cli/mod.rs b/components/chainhook-cli/src/cli/mod.rs index ed51d0b1a..d92f37bc7 100644 --- a/components/chainhook-cli/src/cli/mod.rs +++ b/components/chainhook-cli/src/cli/mod.rs @@ -277,14 +277,14 @@ pub fn main() { let opts: Opts = match Opts::try_parse() { Ok(opts) => opts, Err(e) => { - error!(ctx.expect_logger(), "{e}"); + crit!(ctx.expect_logger(), "{e}"); process::exit(1); } }; match hiro_system_kit::nestable_block_on(handle_command(opts, ctx.clone())) { Err(e) => { - error!(ctx.expect_logger(), "{e}"); + crit!(ctx.expect_logger(), "{e}"); process::exit(1); } Ok(_) => {} diff --git a/components/chainhook-cli/src/config/mod.rs b/components/chainhook-cli/src/config/mod.rs index fc291659f..812d76021 100644 --- a/components/chainhook-cli/src/config/mod.rs +++ b/components/chainhook-cli/src/config/mod.rs @@ -256,13 +256,13 @@ impl Config { } } - pub fn expected_local_stacks_tsv_file(&self) -> &PathBuf { + pub fn expected_local_stacks_tsv_file(&self) -> Result<&PathBuf, String> { for source in self.event_sources.iter() { if let EventSourceConfig::StacksTsvPath(config) = source { - return &config.file_path; + return Ok(&config.file_path); } } - panic!("expected local-tsv source") + Err("could not find expected local tsv source")? } pub fn expected_cache_path(&self) -> PathBuf { @@ -271,21 +271,23 @@ impl Config { destination_path } - fn expected_remote_stacks_tsv_base_url(&self) -> &String { + fn expected_remote_stacks_tsv_base_url(&self) -> Result<&String, String> { for source in self.event_sources.iter() { if let EventSourceConfig::StacksTsvUrl(config) = source { - return &config.file_url; + return Ok(&config.file_url); } } - panic!("expected remote-tsv source") + Err("could not find expected remote tsv source")? } - pub fn expected_remote_stacks_tsv_sha256(&self) -> String { - format!("{}.sha256", self.expected_remote_stacks_tsv_base_url()) + pub fn expected_remote_stacks_tsv_sha256(&self) -> Result { + self.expected_remote_stacks_tsv_base_url() + .map(|url| format!("{}.sha256", url)) } - pub fn expected_remote_stacks_tsv_url(&self) -> String { - format!("{}.gz", self.expected_remote_stacks_tsv_base_url()) + pub fn expected_remote_stacks_tsv_url(&self) -> Result { + self.expected_remote_stacks_tsv_base_url() + .map(|url| format!("{}.gz", url)) } pub fn rely_on_remote_stacks_tsv(&self) -> bool { diff --git a/components/chainhook-cli/src/config/tests/mod.rs b/components/chainhook-cli/src/config/tests/mod.rs index 4adf04a8a..ddae000c2 100644 --- a/components/chainhook-cli/src/config/tests/mod.rs +++ b/components/chainhook-cli/src/config/tests/mod.rs @@ -108,7 +108,6 @@ fn should_download_remote_stacks_tsv_handles_both_modes() { } #[test] -#[should_panic(expected = "expected remote-tsv source")] fn expected_remote_stacks_tsv_base_url_panics_if_missing() { let url_src = EventSourceConfig::StacksTsvUrl(super::UrlConfig { file_url: format!("test"), @@ -116,15 +115,22 @@ fn expected_remote_stacks_tsv_base_url_panics_if_missing() { let mut config = Config::default(true, false, false, &None).unwrap(); config.event_sources = vec![url_src.clone()]; - assert_eq!(config.expected_remote_stacks_tsv_base_url(), "test"); + match config.expected_remote_stacks_tsv_base_url() { + Ok(tsv_url) => assert_eq!(tsv_url, "test"), + Err(e) => { + panic!("expected tsv file: {e}") + } + } config.event_sources = vec![]; - config.expected_remote_stacks_tsv_base_url(); + match config.expected_remote_stacks_tsv_base_url() { + Ok(tsv_url) => panic!("expected no tsv file, found {}", tsv_url), + Err(e) => assert_eq!(e, "could not find expected remote tsv source".to_string()), + }; } #[test] -#[should_panic(expected = "expected local-tsv source")] -fn expected_local_stacks_tsv_base_url_panics_if_missing() { +fn expected_local_stacks_tsv_base_url_errors_if_missing() { let path = PathBuf::from("test"); let path_src = EventSourceConfig::StacksTsvPath(PathConfig { file_path: path.clone(), @@ -132,10 +138,18 @@ fn expected_local_stacks_tsv_base_url_panics_if_missing() { let mut config = Config::default(true, false, false, &None).unwrap(); config.event_sources = vec![path_src.clone()]; - assert_eq!(config.expected_local_stacks_tsv_file(), &path); + match config.expected_local_stacks_tsv_file() { + Ok(tsv_path) => assert_eq!(tsv_path, &path), + Err(e) => { + panic!("expected tsv file: {e}") + } + } config.event_sources = vec![]; - config.expected_local_stacks_tsv_file(); + match config.expected_local_stacks_tsv_file() { + Ok(tsv_path) => panic!("expected no tsv file, found {}", tsv_path.to_string_lossy()), + Err(e) => assert_eq!(e, "could not find expected local tsv source".to_string()), + }; } #[test] diff --git a/components/chainhook-cli/src/scan/bitcoin.rs b/components/chainhook-cli/src/scan/bitcoin.rs index 41ae6e365..886f60998 100644 --- a/components/chainhook-cli/src/scan/bitcoin.rs +++ b/components/chainhook-cli/src/scan/bitcoin.rs @@ -29,6 +29,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( config: &Config, ctx: &Context, ) -> Result { + let predicate_uuid = &predicate_spec.uuid; let auth = Auth::UserPass( config.network.bitcoind_rpc_username.clone(), config.network.bitcoind_rpc_password.clone(), @@ -71,9 +72,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( PredicatesApi::Off => None, }; - info!( + debug!( ctx.expect_logger(), - "Starting predicate evaluation on Bitcoin blocks", + "Starting predicate evaluation on Bitcoin blocks for predicate {predicate_uuid}", ); let mut last_block_scanned = BlockIdentifier::default(); @@ -200,7 +201,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( info!( ctx.expect_logger(), - "{number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered" + "Predicate {predicate_uuid} scan completed. {number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered." ); if let Some(ref mut predicates_db_conn) = predicates_db_conn { @@ -269,9 +270,13 @@ pub async fn execute_predicates_action<'a>( if trigger.chainhook.include_proof { gather_proofs(&trigger, &mut proofs, &config, &ctx); } + let predicate_uuid = &trigger.chainhook.uuid; match handle_bitcoin_hook_action(trigger, &proofs) { Err(e) => { - error!(ctx.expect_logger(), "unable to handle action {}", e); + warn!( + ctx.expect_logger(), + "unable to handle action for predicate {}: {}", predicate_uuid, e + ); } Ok(action) => { actions_triggered += 1; diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index 6d3f90e40..6b50ba419 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -66,7 +66,7 @@ pub async fn get_canonical_fork_from_tsv( start_block: Option, ctx: &Context, ) -> Result, String> { - let seed_tsv_path = config.expected_local_stacks_tsv_file().clone(); + let seed_tsv_path = config.expected_local_stacks_tsv_file()?.clone(); let (record_tx, record_rx) = std::sync::mpsc::channel(); @@ -98,7 +98,7 @@ pub async fn get_canonical_fork_from_tsv( } let _ = record_tx.send(None); }) - .expect("unable to spawn thread"); + .map_err(|e| format!("unable to spawn thread: {e}"))?; let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?; let canonical_fork = { @@ -111,7 +111,10 @@ pub async fn get_canonical_fork_from_tsv( match standardize_stacks_serialized_block_header(&blob) { Ok(data) => data, Err(e) => { - error!(ctx.expect_logger(), "{e}"); + error!( + ctx.expect_logger(), + "Failed to standardize stacks header: {e}" + ); continue; } } @@ -169,12 +172,13 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( config: &Config, ctx: &Context, ) -> Result<(Option, bool), String> { + let predicate_uuid = &predicate_spec.uuid; let mut chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) { Some(chain_tip) => chain_tip, None => match get_last_block_height_inserted(stacks_db_conn, ctx) { Some(chain_tip) => chain_tip, None => { - info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_spec.uuid); + info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_uuid); return Ok((None, false)); } }, @@ -201,9 +205,9 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( }; let proofs = HashMap::new(); - info!( + debug!( ctx.expect_logger(), - "Starting predicate evaluation on Stacks blocks" + "Starting predicate evaluation on Stacks blocks for predicate {}", predicate_uuid ); let mut last_block_scanned = BlockIdentifier::default(); let mut err_count = 0; @@ -243,7 +247,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( None => match get_last_block_height_inserted(stacks_db_conn, ctx) { Some(chain_tip) => chain_tip, None => { - info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_spec.uuid); + warn!(ctx.expect_logger(), "No blocks inserted in db; cannot determine Stacks chain tip. Skipping scan of predicate {}", predicate_uuid); return Ok((None, false)); } }, @@ -304,7 +308,10 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( }; let res = match handle_stacks_hook_action(trigger, &proofs, &ctx) { Err(e) => { - error!(ctx.expect_logger(), "unable to handle action {}", e); + warn!( + ctx.expect_logger(), + "unable to handle action for predicate {}: {}", predicate_uuid, e + ); Ok(()) // todo: should this error increment our err_count? } Ok(action) => { @@ -342,7 +349,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( } info!( ctx.expect_logger(), - "{number_of_blocks_scanned} blocks scanned, {number_of_times_triggered} blocks triggering predicate" + "Predicate {predicate_uuid} scan completed. {number_of_blocks_scanned} blocks scanned, {number_of_times_triggered} blocks triggering predicate.", ); if let Some(ref mut predicates_db_conn) = predicates_db_conn { @@ -420,7 +427,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate( } } - let _ = download_stacks_dataset_if_required(config, ctx).await; + let _ = download_stacks_dataset_if_required(config, ctx).await?; let mut canonical_fork = get_canonical_fork_from_tsv(config, None, ctx).await?; @@ -516,7 +523,7 @@ pub async fn consolidate_local_stacks_chainstate_using_csv( "Building local chainstate from Stacks archive file" ); - let downloaded_new_dataset = download_stacks_dataset_if_required(config, ctx).await; + let downloaded_new_dataset = download_stacks_dataset_if_required(config, ctx).await?; if downloaded_new_dataset { let stacks_db = @@ -550,13 +557,15 @@ pub async fn consolidate_local_stacks_chainstate_using_csv( ) { Ok(block) => block, Err(e) => { - error!(&ctx.expect_logger(), "{e}"); + error!( + &ctx.expect_logger(), + "Failed to standardize stacks block: {e}" + ); continue; } }; - // TODO: return a result - insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx); + insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx)?; if blocks_inserted % 2500 == 0 { info!( diff --git a/components/chainhook-cli/src/service/http_api.rs b/components/chainhook-cli/src/service/http_api.rs index 191a33a7c..460b2f9a1 100644 --- a/components/chainhook-cli/src/service/http_api.rs +++ b/components/chainhook-cli/src/service/http_api.rs @@ -28,7 +28,7 @@ pub async fn start_predicate_api_server( api_config: PredicatesApiConfig, observer_commands_tx: Sender, ctx: Context, -) -> Result> { +) -> Result> { let log_level = LogLevel::Off; let mut shutdown_config = config::Shutdown::default(); @@ -62,12 +62,12 @@ pub async fn start_predicate_api_server( .ignite() .await?; - let ingestion_shutdown = ignite.shutdown(); + let predicate_api_shutdown = ignite.shutdown(); let _ = std::thread::spawn(move || { let _ = hiro_system_kit::nestable_block_on(ignite.launch()); }); - Ok(ingestion_shutdown) + Ok(predicate_api_shutdown) } #[openapi(tag = "Health Check")] @@ -298,9 +298,12 @@ pub fn get_entry_from_predicates_db( let spec = ChainhookSpecification::deserialize_specification(&encoded_spec)?; let encoded_status = match entry.get("status") { - None => unimplemented!(), - Some(payload) => payload, - }; + None => Err(format!( + "found predicate specification with no status for predicate {}", + predicate_key + )), + Some(payload) => Ok(payload), + }?; let status = serde_json::from_str(&encoded_status).map_err(|e| format!("{}", e.to_string()))?; diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index 86286c356..9bb7e95e5 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -89,15 +89,16 @@ impl Service { } match chainhook_config.register_specification(predicate) { Ok(_) => { - info!( + debug!( self.ctx.expect_logger(), - "Predicate {} retrieved from storage and loaded", predicate_uuid, + "Predicate {} retrieved from storage and registered", predicate_uuid, ); } Err(e) => { - error!( + warn!( self.ctx.expect_logger(), - "Failed loading predicate from storage: {}", + "Failed to register predicate {} after retrieving from storage: {}", + predicate_uuid, e.to_string() ); } @@ -117,7 +118,7 @@ impl Service { &self.ctx, ) { Ok(Some(_)) => { - error!( + warn!( self.ctx.expect_logger(), "Predicate uuid already in use: {uuid}", ); @@ -136,16 +137,16 @@ impl Service { ) { Ok(spec) => { newly_registered_predicates.push(spec.clone()); - info!( + debug!( self.ctx.expect_logger(), "Predicate {} retrieved from config and loaded", spec.uuid(), ); } Err(e) => { - error!( + warn!( self.ctx.expect_logger(), - "Failed loading predicate from config: {}", + "Failed to load predicate from config: {}", e.to_string() ); } @@ -163,7 +164,7 @@ impl Service { // Download and ingest a Stacks dump if self.config.rely_on_remote_stacks_tsv() { let _ = - consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await; + consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await?; } // Stacks scan operation threadpool @@ -176,9 +177,12 @@ impl Service { start_stacks_scan_runloop( &config, stacks_scan_op_rx, - observer_command_tx_moved, + observer_command_tx_moved.clone(), &ctx, ); + // the scan runloop should loop forever; if it finishes, something is wrong + crit!(ctx.expect_logger(), "Stacks scan runloop stopped.",); + let _ = observer_command_tx_moved.send(ObserverCommand::Terminate); }) .expect("unable to spawn thread"); @@ -192,15 +196,18 @@ impl Service { start_bitcoin_scan_runloop( &config, bitcoin_scan_op_rx, - observer_command_tx_moved, + observer_command_tx_moved.clone(), &ctx, ); + // the scan runloop should loop forever; if it finishes, something is wrong + crit!(ctx.expect_logger(), "Bitcoin scan runloop stopped.",); + let _ = observer_command_tx_moved.send(ObserverCommand::Terminate); }) .expect("unable to spawn thread"); // Enable HTTP Predicates API, if required let config = self.config.clone(); - if let PredicatesApi::On(ref api_config) = config.http_api { + let predicate_api_shutdown = if let PredicatesApi::On(ref api_config) = config.http_api { info!( self.ctx.expect_logger(), "Listening on port {} for chainhook predicate registrations", api_config.http_port @@ -209,11 +216,29 @@ impl Service { let api_config = api_config.clone(); let moved_observer_command_tx = observer_command_tx.clone(); // Test and initialize a database connection - let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || { - let future = start_predicate_api_server(api_config, moved_observer_command_tx, ctx); - let _ = hiro_system_kit::nestable_block_on(future); - }); - } + let res = hiro_system_kit::thread_named("HTTP Predicate API") + .spawn(move || { + let future = start_predicate_api_server( + api_config, + moved_observer_command_tx.clone(), + ctx.clone(), + ); + hiro_system_kit::nestable_block_on(future) + }) + .expect("unable to spawn thread"); + let res = res.join().expect("unable to terminate thread"); + match res { + Ok(predicate_api_shutdown) => Some(predicate_api_shutdown), + Err(e) => { + return Err(format!( + "Predicate API Registration server failed to start: {}", + e + )); + } + } + } else { + None + }; let ctx = self.ctx.clone(); let stacks_db = @@ -281,7 +306,7 @@ impl Service { let event = match observer_event_rx.recv() { Ok(cmd) => cmd, Err(e) => { - error!( + crit!( self.ctx.expect_logger(), "Error: broken channel {}", e.to_string() @@ -343,20 +368,20 @@ impl Service { ); } } - ObserverEvent::PredicateDeregistered(spec) => { + ObserverEvent::PredicateDeregistered(uuid) => { if let PredicatesApi::On(ref config) = self.config.http_api { let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else { continue; }; - let predicate_key = spec.key(); + let predicate_key = ChainhookSpecification::either_stx_or_btc_key(&uuid); let res: Result<(), redis::RedisError> = - predicates_db_conn.del(predicate_key); + predicates_db_conn.del(predicate_key.clone()); if let Err(e) = res { - error!( + warn!( self.ctx.expect_logger(), - "unable to delete predicate: {}", + "unable to delete predicate {predicate_key}: {}", e.to_string() ); } @@ -429,7 +454,7 @@ impl Service { } } } - update_stats_from_report( + update_status_from_report( Chain::Bitcoin, report, &mut predicates_db_conn, @@ -446,7 +471,7 @@ impl Service { Err(e) => { error!( self.ctx.expect_logger(), - "unable to store stacks block: {}", + "unable to open stacks db: {}", e.to_string() ); continue; @@ -455,28 +480,48 @@ impl Service { match &chain_event { StacksChainEvent::ChainUpdatedWithBlocks(data) => { - confirm_entries_in_stacks_blocks( + if let Err(e) = confirm_entries_in_stacks_blocks( &data.confirmed_blocks, &stacks_db_conn_rw, &self.ctx, - ); - draft_entries_in_stacks_blocks( + ) { + error!( + self.ctx.expect_logger(), + "unable add confirmed entries to stacks db: {}", e + ); + }; + if let Err(e) = draft_entries_in_stacks_blocks( &data.new_blocks, &stacks_db_conn_rw, &self.ctx, - ) + ) { + error!( + self.ctx.expect_logger(), + "unable add unconfirmed entries to stacks db: {}", e + ); + }; } StacksChainEvent::ChainUpdatedWithReorg(data) => { - confirm_entries_in_stacks_blocks( + if let Err(e) = confirm_entries_in_stacks_blocks( &data.confirmed_blocks, &stacks_db_conn_rw, &self.ctx, - ); - draft_entries_in_stacks_blocks( + ) { + error!( + self.ctx.expect_logger(), + "unable add confirmed entries to stacks db: {}", e + ); + }; + if let Err(e) = draft_entries_in_stacks_blocks( &data.blocks_to_apply, &stacks_db_conn_rw, &self.ctx, - ) + ) { + error!( + self.ctx.expect_logger(), + "unable add unconfirmed entries to stacks db: {}", e + ); + }; } StacksChainEvent::ChainUpdatedWithMicroblocks(_) | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} @@ -546,7 +591,7 @@ impl Service { StacksChainEvent::ChainUpdatedWithMicroblocks(_) | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} }; - update_stats_from_report( + update_status_from_report( Chain::Stacks, report, &mut predicates_db_conn, @@ -557,15 +602,36 @@ impl Service { // Every 32 blocks, we will check if there's a new Stacks file archive to ingest if stacks_event > 32 { stacks_event = 0; - let _ = consolidate_local_stacks_chainstate_using_csv( - &mut self.config, - &self.ctx, - ) - .await; + if self.config.rely_on_remote_stacks_tsv() { + match consolidate_local_stacks_chainstate_using_csv( + &mut self.config, + &self.ctx, + ) + .await + { + Err(e) => { + error!( + self.ctx.expect_logger(), + "Failed to update database from archive: {e}" + ) + } + Ok(()) => {} + }; + } } } ObserverEvent::Terminate => { - info!(self.ctx.expect_logger(), "Terminating runloop"); + info!( + self.ctx.expect_logger(), + "Terminating ObserverEvent runloop" + ); + if let Some(predicate_api_shutdown) = predicate_api_shutdown { + info!( + self.ctx.expect_logger(), + "Terminating Predicate Registration API" + ); + predicate_api_shutdown.notify(); + } break; } _ => {} @@ -615,7 +681,7 @@ pub struct ExpiredData { pub expired_at_block_height: u64, } -fn update_stats_from_report( +fn update_status_from_report( chain: Chain, report: PredicateEvaluationReport, predicates_db_conn: &mut Connection, @@ -623,7 +689,7 @@ fn update_stats_from_report( ) { for (predicate_uuid, blocks_ids) in report.predicates_triggered.iter() { if let Some(last_triggered_height) = blocks_ids.last().and_then(|b| Some(b.index)) { - let triggered_count = blocks_ids.len().try_into().unwrap(); + let triggered_count = blocks_ids.len().try_into().unwrap_or(0); set_predicate_streaming_status( StreamingDataType::Occurrence { last_triggered_height, @@ -652,7 +718,7 @@ fn update_stats_from_report( } } if let Some(last_evaluated_height) = blocks_ids.last().and_then(|b| Some(b.index)) { - let evaluated_count = blocks_ids.len().try_into().unwrap(); + let evaluated_count = blocks_ids.len().try_into().unwrap_or(0); set_predicate_streaming_status( StreamingDataType::Evaluation { last_evaluated_height, @@ -666,7 +732,7 @@ fn update_stats_from_report( } for (predicate_uuid, blocks_ids) in report.predicates_expired.iter() { if let Some(last_evaluated_height) = blocks_ids.last().and_then(|b| Some(b.index)) { - let evaluated_count = blocks_ids.len().try_into().unwrap(); + let evaluated_count = blocks_ids.len().try_into().unwrap_or(0); set_unconfirmed_expiration_status( &chain, evaluated_count, @@ -1035,13 +1101,13 @@ fn insert_predicate_expiration( if let Err(e) = predicates_db_conn.hset::<_, _, _, ()>(&key, "predicates", &serialized_expiring_predicates) { - error!( + warn!( ctx.expect_logger(), "Error updating expired predicates index: {}", e.to_string() ); } else { - info!( + debug!( ctx.expect_logger(), "Updating expired predicates at block height {expired_at_block_height} with predicate: {predicate_key}" ); @@ -1060,7 +1126,7 @@ fn get_predicates_expiring_at_block( Ok(data) => { if let Err(e) = predicates_db_conn.hdel::<_, _, u64>(key.to_string(), "predicates") { - error!( + warn!( ctx.expect_logger(), "Error removing expired predicates index: {}", e.to_string() @@ -1084,13 +1150,14 @@ pub fn update_predicate_status( if let Err(e) = predicates_db_conn.hset::<_, _, _, ()>(&predicate_key, "status", &serialized_status) { - error!( + warn!( ctx.expect_logger(), - "Error updating status: {}", + "Error updating status for {}: {}", + predicate_key, e.to_string() ); } else { - info!( + debug!( ctx.expect_logger(), "Updating predicate {predicate_key} status: {serialized_status}" ); @@ -1107,13 +1174,14 @@ fn update_predicate_spec( if let Err(e) = predicates_db_conn.hset::<_, _, _, ()>(&predicate_key, "specification", &serialized_spec) { - error!( + warn!( ctx.expect_logger(), - "Error updating status: {}", + "Error updating status for {}: {}", + predicate_key, e.to_string() ); } else { - info!( + debug!( ctx.expect_logger(), "Updating predicate {predicate_key} with spec: {serialized_spec}" ); @@ -1154,6 +1222,7 @@ pub fn open_readwrite_predicates_db_conn_verbose( res } +// todo: evaluate expects pub fn open_readwrite_predicates_db_conn_or_panic( config: &PredicatesApiConfig, ctx: &Context, diff --git a/components/chainhook-cli/src/service/runloops.rs b/components/chainhook-cli/src/service/runloops.rs index ee441008a..d1c4d32cc 100644 --- a/components/chainhook-cli/src/service/runloops.rs +++ b/components/chainhook-cli/src/service/runloops.rs @@ -43,9 +43,12 @@ pub fn start_stacks_scan_runloop( { Ok(db_conn) => db_conn, Err(e) => { + // todo: if we repeatedly can't connect to the database, we should restart the + // service to get to a healthy state. I don't know if this has been an issue, though + // so we can monitor and possibly remove this todo error!( moved_ctx.expect_logger(), - "unable to store stacks block: {}", + "unable to open stacks db: {}", e.to_string() ); unimplemented!() @@ -108,8 +111,7 @@ pub fn start_stacks_scan_runloop( } }); } - let res = stacks_scan_pool.join(); - res + let _ = stacks_scan_pool.join(); } pub fn start_bitcoin_scan_runloop( @@ -138,7 +140,7 @@ pub fn start_bitcoin_scan_runloop( let predicate_is_expired = match hiro_system_kit::nestable_block_on(op) { Ok(predicate_is_expired) => predicate_is_expired, Err(e) => { - error!( + warn!( moved_ctx.expect_logger(), "Unable to evaluate predicate on Bitcoin chainstate: {e}", ); diff --git a/components/chainhook-cli/src/storage/mod.rs b/components/chainhook-cli/src/storage/mod.rs index 2d18aa33b..6c2d04ca3 100644 --- a/components/chainhook-cli/src/storage/mod.rs +++ b/components/chainhook-cli/src/storage/mod.rs @@ -116,12 +116,16 @@ fn get_last_unconfirmed_insert_key() -> [u8; 3] { *LAST_UNCONFIRMED_KEY_PREFIX } -pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB, _ctx: &Context) { +pub fn insert_entry_in_stacks_blocks( + block: &StacksBlockData, + stacks_db_rw: &DB, + _ctx: &Context, +) -> Result<(), String> { let key = get_block_key(&block.block_identifier); let block_bytes = json!(block); stacks_db_rw .put(&key, &block_bytes.to_string().as_bytes()) - .expect("unable to insert blocks"); + .map_err(|e| format!("unable to insert blocks: {}", e))?; let previous_last_inserted = get_last_block_height_inserted(stacks_db_rw, _ctx).unwrap_or(0); if block.block_identifier.index > previous_last_inserted { stacks_db_rw @@ -129,35 +133,39 @@ pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB, get_last_confirmed_insert_key(), block.block_identifier.index.to_be_bytes(), ) - .expect("unable to insert metadata"); + .map_err(|e| format!("unable to insert metadata: {}", e))?; } + Ok(()) } pub fn insert_unconfirmed_entry_in_stacks_blocks( block: &StacksBlockData, stacks_db_rw: &DB, _ctx: &Context, -) { +) -> Result<(), String> { let key = get_unconfirmed_block_key(&block.block_identifier); let block_bytes = json!(block); stacks_db_rw .put(&key, &block_bytes.to_string().as_bytes()) - .expect("unable to insert blocks"); + .map_err(|e| format!("unable to insert blocks: {}", e))?; stacks_db_rw .put( get_last_unconfirmed_insert_key(), block.block_identifier.index.to_be_bytes(), ) - .expect("unable to insert metadata"); + .map_err(|e| format!("unable to insert metadata: {}", e))?; + Ok(()) } pub fn delete_unconfirmed_entry_from_stacks_blocks( block_identifier: &BlockIdentifier, stacks_db_rw: &DB, _ctx: &Context, -) { +) -> Result<(), String> { let key = get_unconfirmed_block_key(&block_identifier); - stacks_db_rw.delete(&key).expect("unable to delete blocks"); + stacks_db_rw + .delete(&key) + .map_err(|e| format!("unable to delete blocks: {}", e)) } pub fn get_last_unconfirmed_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option { @@ -209,22 +217,24 @@ pub fn confirm_entries_in_stacks_blocks( blocks: &Vec, stacks_db_rw: &DB, ctx: &Context, -) { +) -> Result<(), String> { for block in blocks.iter() { - insert_entry_in_stacks_blocks(block, stacks_db_rw, ctx); - delete_unconfirmed_entry_from_stacks_blocks(&block.block_identifier, stacks_db_rw, ctx); + insert_entry_in_stacks_blocks(block, stacks_db_rw, ctx)?; + delete_unconfirmed_entry_from_stacks_blocks(&block.block_identifier, stacks_db_rw, ctx)?; } + Ok(()) } pub fn draft_entries_in_stacks_blocks( block_updates: &Vec, stacks_db_rw: &DB, ctx: &Context, -) { +) -> Result<(), String> { for update in block_updates.iter() { // TODO: Could be imperfect, from a microblock point of view - insert_unconfirmed_entry_in_stacks_blocks(&update.block, stacks_db_rw, ctx); + insert_unconfirmed_entry_in_stacks_blocks(&update.block, stacks_db_rw, ctx)?; } + Ok(()) } pub fn get_stacks_block_at_block_height( diff --git a/components/chainhook-sdk/Cargo.toml b/components/chainhook-sdk/Cargo.toml index d85a8dcd5..b8789ae2f 100644 --- a/components/chainhook-sdk/Cargo.toml +++ b/components/chainhook-sdk/Cargo.toml @@ -50,4 +50,4 @@ test-case = "3.1.0" default = ["hiro-system-kit/log"] zeromq = ["zmq"] debug = ["hiro-system-kit/debug"] -release = ["hiro-system-kit/release"] +release = ["hiro-system-kit/debug"] diff --git a/components/chainhook-sdk/src/indexer/fork_scratch_pad.rs b/components/chainhook-sdk/src/indexer/fork_scratch_pad.rs index 251913a06..4bd584e1f 100644 --- a/components/chainhook-sdk/src/indexer/fork_scratch_pad.rs +++ b/components/chainhook-sdk/src/indexer/fork_scratch_pad.rs @@ -293,7 +293,7 @@ impl ForkScratchPad { ctx.try_log(|logger| { slog::error!( logger, - "unable to retrive Bitcoin {} from block store", + "unable to retrieve Bitcoin block {} from block store", block_identifier ) }); @@ -317,7 +317,16 @@ impl ForkScratchPad { let block_identifier = &divergence.block_ids_to_apply[i]; let header = match self.headers_store.get(block_identifier) { Some(header) => header.clone(), - None => panic!("unable to retrive block from block store"), + None => { + ctx.try_log(|logger| { + slog::error!( + logger, + "unable to retrieve Bitcoin block {} from block store", + block_identifier + ) + }); + return Err(ChainSegmentIncompatibility::Unknown); + } }; new_headers.push(header) } @@ -336,22 +345,40 @@ impl ForkScratchPad { .map(|block_id| { let block = match self.headers_store.get(block_id) { Some(block) => block.clone(), - None => panic!("unable to retrive block from block store"), + None => { + ctx.try_log(|logger| { + slog::error!( + logger, + "unable to retrieve Bitcoin block {} from block store", + block_id + ) + }); + return Err(ChainSegmentIncompatibility::Unknown); + } }; - block + Ok(block) }) - .collect::>(), + .collect::, _>>()?, headers_to_apply: divergence .block_ids_to_apply .iter() .map(|block_id| { let block = match self.headers_store.get(block_id) { Some(block) => block.clone(), - None => panic!("unable to retrive block from block store"), + None => { + ctx.try_log(|logger| { + slog::error!( + logger, + "unable to retrieve Bitcoin block {} from block store", + block_id + ) + }); + return Err(ChainSegmentIncompatibility::Unknown); + } }; - block + Ok(block) }) - .collect::>(), + .collect::, _>>()?, confirmed_headers: vec![], }, )); diff --git a/components/chainhook-sdk/src/indexer/mod.rs b/components/chainhook-sdk/src/indexer/mod.rs index 9b0c97e92..a518aaaa6 100644 --- a/components/chainhook-sdk/src/indexer/mod.rs +++ b/components/chainhook-sdk/src/indexer/mod.rs @@ -367,7 +367,7 @@ impl ChainSegment { } Err(incompatibility) => { ctx.try_log(|logger| { - slog::info!(logger, "Will have to fork: {:?}", incompatibility) + slog::warn!(logger, "Will have to fork: {:?}", incompatibility) }); match incompatibility { ChainSegmentIncompatibility::BlockCollision => { diff --git a/components/chainhook-sdk/src/indexer/stacks/mod.rs b/components/chainhook-sdk/src/indexer/stacks/mod.rs index 4cff47b7f..0eeb70128 100644 --- a/components/chainhook-sdk/src/indexer/stacks/mod.rs +++ b/components/chainhook-sdk/src/indexer/stacks/mod.rs @@ -290,9 +290,11 @@ pub fn standardize_stacks_serialized_block_header( .parent_index_block_hash .take() .ok_or(format!("unable to retrieve parent_index_block_hash"))?; + + let parent_height = block_identifier.index.saturating_sub(1); let parent_block_identifier = BlockIdentifier { hash: parent_hash, - index: block_identifier.index - 1, + index: parent_height, }; Ok((block_identifier, parent_block_identifier)) } @@ -902,7 +904,7 @@ pub fn get_standardized_non_fungible_currency_from_asset_class_id( }), } } - +//todo: this function has a lot of expects/panics. should return result instead pub fn get_standardized_stacks_receipt( _txid: &str, events: Vec, diff --git a/components/chainhook-sdk/src/observer/http.rs b/components/chainhook-sdk/src/observer/http.rs index 25da770d9..f844718d5 100644 --- a/components/chainhook-sdk/src/observer/http.rs +++ b/components/chainhook-sdk/src/observer/http.rs @@ -132,7 +132,7 @@ pub async fn handle_new_bitcoin_block( }; } Ok(None) => { - ctx.try_log(|logger| slog::info!(logger, "unable to infer chain progress")); + ctx.try_log(|logger| slog::warn!(logger, "unable to infer chain progress")); } Err(e) => { ctx.try_log(|logger| slog::error!(logger, "unable to handle bitcoin block: {}", e)) @@ -202,7 +202,7 @@ pub fn handle_new_stacks_block( }; } Ok(None) => { - ctx.try_log(|logger| slog::info!(logger, "unable to infer chain progress")); + ctx.try_log(|logger| slog::warn!(logger, "unable to infer chain progress")); } Err(e) => ctx.try_log(|logger| slog::error!(logger, "{}", e)), } @@ -273,7 +273,7 @@ pub fn handle_new_microblocks( }; } Ok(None) => { - ctx.try_log(|logger| slog::info!(logger, "unable to infer chain progress")); + ctx.try_log(|logger| slog::warn!(logger, "unable to infer chain progress")); } Err(e) => { ctx.try_log(|logger| slog::error!(logger, "unable to handle stacks microblock: {}", e)); diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index cc1108b68..4e04e62a7 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -304,7 +304,7 @@ pub enum ObserverEvent { StacksChainEvent((StacksChainEvent, PredicateEvaluationReport)), NotifyBitcoinTransactionProxied, PredicateRegistered(ChainhookSpecification), - PredicateDeregistered(ChainhookSpecification), + PredicateDeregistered(String), PredicateEnabled(ChainhookSpecification), BitcoinPredicateTriggered(BitcoinChainhookOccurrencePayload), StacksPredicateTriggered(StacksChainhookOccurrencePayload), @@ -437,35 +437,65 @@ pub fn start_event_observer( let context_cloned = ctx.clone(); let event_observer_config_moved = config.clone(); let observer_commands_tx_moved = observer_commands_tx.clone(); - let _ = hiro_system_kit::thread_named("Chainhook event observer").spawn(move || { - let future = start_bitcoin_event_observer( - event_observer_config_moved, - observer_commands_tx_moved, - observer_commands_rx, - observer_events_tx, - observer_sidecar, - context_cloned, - ); - let _ = hiro_system_kit::nestable_block_on(future); - }); + let _ = hiro_system_kit::thread_named("Chainhook event observer") + .spawn(move || { + let future = start_bitcoin_event_observer( + event_observer_config_moved, + observer_commands_tx_moved, + observer_commands_rx, + observer_events_tx.clone(), + observer_sidecar, + context_cloned.clone(), + ); + match hiro_system_kit::nestable_block_on(future) { + Ok(_) => {} + Err(e) => { + if let Some(tx) = observer_events_tx { + context_cloned.try_log(|logger| { + slog::crit!( + logger, + "Chainhook event observer thread failed with error: {e}", + ) + }); + let _ = tx.send(ObserverEvent::Terminate); + } + } + } + }) + .expect("unable to spawn thread"); } BitcoinBlockSignaling::Stacks(ref _url) => { // Start chainhook event observer let context_cloned = ctx.clone(); let event_observer_config_moved = config.clone(); let observer_commands_tx_moved = observer_commands_tx.clone(); - let _ = hiro_system_kit::thread_named("Chainhook event observer").spawn(move || { - let future = start_stacks_event_observer( - event_observer_config_moved, - observer_commands_tx_moved, - observer_commands_rx, - observer_events_tx, - observer_sidecar, - stacks_block_pool_seed, - context_cloned, - ); - let _ = hiro_system_kit::nestable_block_on(future); - }); + let _ = hiro_system_kit::thread_named("Chainhook event observer") + .spawn(move || { + let future = start_stacks_event_observer( + event_observer_config_moved, + observer_commands_tx_moved, + observer_commands_rx, + observer_events_tx.clone(), + observer_sidecar, + stacks_block_pool_seed, + context_cloned.clone(), + ); + match hiro_system_kit::nestable_block_on(future) { + Ok(_) => {} + Err(e) => { + if let Some(tx) = observer_events_tx { + context_cloned.try_log(|logger| { + slog::crit!( + logger, + "Chainhook event observer thread failed with error: {e}", + ) + }); + let _ = tx.send(ObserverEvent::Terminate); + } + } + } + }) + .expect("unable to spawn thread"); ctx.try_log(|logger| { slog::info!( @@ -707,7 +737,7 @@ pub fn gather_proofs<'a>( for transaction in transactions.iter() { if !proofs.contains_key(&transaction.transaction_identifier) { ctx.try_log(|logger| { - slog::info!( + slog::debug!( logger, "Collecting proof for transaction {}", transaction.transaction_identifier.hash @@ -722,7 +752,7 @@ pub fn gather_proofs<'a>( proofs.insert(&transaction.transaction_identifier, proof); } Err(e) => { - ctx.try_log(|logger| slog::error!(logger, "{e}")); + ctx.try_log(|logger| slog::warn!(logger, "{e}")); } } } @@ -758,38 +788,36 @@ pub async fn start_observer_commands_handler( let command = match observer_commands_rx.recv() { Ok(cmd) => cmd, Err(e) => { - if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::Error(format!("Channel error: {:?}", e))); - } - continue; + ctx.try_log(|logger| { + slog::crit!(logger, "Error: broken channel {}", e.to_string()) + }); + break; } }; match command { ObserverCommand::Terminate => { - ctx.try_log(|logger| slog::info!(logger, "Handling Termination command")); - if let Some(ingestion_shutdown) = ingestion_shutdown { - ingestion_shutdown.notify(); - } - if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::Info("Terminating event observer".into())); - let _ = tx.send(ObserverEvent::Terminate); - } break; } ObserverCommand::ProcessBitcoinBlock(mut block_data) => { let block_hash = block_data.hash.to_string(); + let mut attempts = 0; + let max_attempts = 10; let block = loop { match standardize_bitcoin_block( block_data.clone(), &config.bitcoin_network, &ctx, ) { - Ok(block) => break block, - Err((e, retry)) => { + Ok(block) => break Some(block), + Err((e, refetch_block)) => { + attempts += 1; + if attempts > max_attempts { + break None; + } ctx.try_log(|logger| { - slog::error!(logger, "Error standardizing block: {}", e) + slog::warn!(logger, "Error standardizing block: {}", e) }); - if retry { + if refetch_block { block_data = match download_and_parse_block_with_retry( &http_client, &block_hash, @@ -814,7 +842,16 @@ pub async fn start_observer_commands_handler( } }; }; - + let Some(block) = block else { + ctx.try_log(|logger| { + slog::crit!( + logger, + "Could not process bitcoin block after {} attempts.", + attempts + ) + }); + break; + }; prometheus_monitoring.btc_metrics_ingest_block(block.block_identifier.index); bitcoin_block_store.insert( @@ -1095,10 +1132,16 @@ pub async fn start_observer_commands_handler( )); } for chainhook_to_trigger in chainhooks_to_trigger.into_iter() { + let predicate_uuid = &chainhook_to_trigger.chainhook.uuid; match handle_bitcoin_hook_action(chainhook_to_trigger, &proofs) { Err(e) => { ctx.try_log(|logger| { - slog::error!(logger, "unable to handle action {}", e) + slog::warn!( + logger, + "unable to handle action for predicate {}: {}", + predicate_uuid, + e + ) }); } Ok(BitcoinChainhookOccurrence::Http(request, data)) => { @@ -1106,7 +1149,7 @@ pub async fn start_observer_commands_handler( } Ok(BitcoinChainhookOccurrence::File(_path, _bytes)) => { ctx.try_log(|logger| { - slog::info!(logger, "Writing to disk not supported in server mode") + slog::warn!(logger, "Writing to disk not supported in server mode") }) } Ok(BitcoinChainhookOccurrence::Data(payload)) => { @@ -1125,21 +1168,20 @@ pub async fn start_observer_commands_handler( }); for hook_uuid in hooks_ids_to_deregister.iter() { - if let Some(chainhook) = chainhook_store + if chainhook_store .predicates .deregister_bitcoin_hook(hook_uuid.clone()) + .is_some() { prometheus_monitoring.btc_metrics_deregister_predicate(); - - if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::PredicateDeregistered( - ChainhookSpecification::Bitcoin(chainhook), - )); - } + } + if let Some(ref tx) = observer_events_tx { + let _ = tx.send(ObserverEvent::PredicateDeregistered(hook_uuid.clone())); } } for (request, data) in requests.into_iter() { + // todo: need to handle failure case - we should be setting interrupted status: https://github.com/hirosystems/chainhook/issues/523 if send_request(request, 3, 1, &ctx).await.is_ok() { if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::BitcoinPredicateTriggered(data)); @@ -1264,10 +1306,16 @@ pub async fn start_observer_commands_handler( } let proofs = HashMap::new(); for chainhook_to_trigger in chainhooks_to_trigger.into_iter() { + let predicate_uuid = &chainhook_to_trigger.chainhook.uuid; match handle_stacks_hook_action(chainhook_to_trigger, &proofs, &ctx) { Err(e) => { ctx.try_log(|logger| { - slog::error!(logger, "unable to handle action {}", e) + slog::warn!( + logger, + "unable to handle action for predicate {}: {}", + predicate_uuid, + e + ) }); } Ok(StacksChainhookOccurrence::Http(request)) => { @@ -1275,7 +1323,7 @@ pub async fn start_observer_commands_handler( } Ok(StacksChainhookOccurrence::File(_path, _bytes)) => { ctx.try_log(|logger| { - slog::info!(logger, "Writing to disk not supported in server mode") + slog::warn!(logger, "Writing to disk not supported in server mode") }) } Ok(StacksChainhookOccurrence::Data(payload)) => { @@ -1287,24 +1335,22 @@ pub async fn start_observer_commands_handler( } for hook_uuid in hooks_ids_to_deregister.iter() { - if let Some(chainhook) = chainhook_store + if chainhook_store .predicates .deregister_stacks_hook(hook_uuid.clone()) + .is_some() { prometheus_monitoring.stx_metrics_deregister_predicate(); - - if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::PredicateDeregistered( - ChainhookSpecification::Stacks(chainhook), - )); - } + } + if let Some(ref tx) = observer_events_tx { + let _ = tx.send(ObserverEvent::PredicateDeregistered(hook_uuid.clone())); } } for request in requests.into_iter() { // todo(lgalabru): collect responses for reporting ctx.try_log(|logger| { - slog::info!( + slog::debug!( logger, "Dispatching request from stacks chainhook {:?}", request @@ -1327,7 +1373,7 @@ pub async fn start_observer_commands_handler( } ObserverCommand::NotifyBitcoinTransactionProxied => { ctx.try_log(|logger| { - slog::info!(logger, "Handling NotifyBitcoinTransactionProxied command") + slog::debug!(logger, "Handling NotifyBitcoinTransactionProxied command") }); if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::NotifyBitcoinTransactionProxied); @@ -1343,14 +1389,13 @@ pub async fn start_observer_commands_handler( Ok(spec) => spec, Err(e) => { ctx.try_log(|logger| { - slog::error!( + slog::warn!( logger, "Unable to register new chainhook spec: {}", e.to_string() ) }); - panic!("Unable to register new chainhook spec: {}", e.to_string()); - //continue; + continue; } }; @@ -1363,11 +1408,15 @@ pub async fn start_observer_commands_handler( } }; - ctx.try_log(|logger| slog::info!(logger, "Registering chainhook {}", spec.uuid(),)); + ctx.try_log( + |logger| slog::debug!(logger, "Registering chainhook {}", spec.uuid(),), + ); if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::PredicateRegistered(spec.clone())); } else { - ctx.try_log(|logger| slog::info!(logger, "Enabling Predicate {}", spec.uuid())); + ctx.try_log(|logger| { + slog::debug!(logger, "Enabling Predicate {}", spec.uuid()) + }); chainhook_store.predicates.enable_specification(&mut spec); } } @@ -1382,15 +1431,19 @@ pub async fn start_observer_commands_handler( ctx.try_log(|logger| { slog::info!(logger, "Handling DeregisterStacksPredicate command") }); - let hook = chainhook_store.predicates.deregister_stacks_hook(hook_uuid); - - prometheus_monitoring.stx_metrics_deregister_predicate(); + let hook = chainhook_store + .predicates + .deregister_stacks_hook(hook_uuid.clone()); - if let (Some(tx), Some(hook)) = (&observer_events_tx, hook) { - let _ = tx.send(ObserverEvent::PredicateDeregistered( - ChainhookSpecification::Stacks(hook), - )); - } + if hook.is_some() { + // on startup, only the predicates in the `chainhook_store` are added to the monitoring count, + // so only those that we find in the store should be removed + prometheus_monitoring.stx_metrics_deregister_predicate(); + }; + // event if the predicate wasn't in the `chainhook_store`, propogate this event to delete from redis + if let Some(tx) = &observer_events_tx { + let _ = tx.send(ObserverEvent::PredicateDeregistered(hook_uuid)); + }; } ObserverCommand::DeregisterBitcoinPredicate(hook_uuid) => { ctx.try_log(|logger| { @@ -1398,15 +1451,17 @@ pub async fn start_observer_commands_handler( }); let hook = chainhook_store .predicates - .deregister_bitcoin_hook(hook_uuid); + .deregister_bitcoin_hook(hook_uuid.clone()); - prometheus_monitoring.btc_metrics_deregister_predicate(); - - if let (Some(tx), Some(hook)) = (&observer_events_tx, hook) { - let _ = tx.send(ObserverEvent::PredicateDeregistered( - ChainhookSpecification::Bitcoin(hook), - )); - } + if hook.is_some() { + // on startup, only the predicates in the `chainhook_store` are added to the monitoring count, + // so only those that we find in the store should be removed + prometheus_monitoring.btc_metrics_deregister_predicate(); + }; + // event if the predicate wasn't in the `chainhook_store`, propogate this event to delete from redis + if let Some(tx) = &observer_events_tx { + let _ = tx.send(ObserverEvent::PredicateDeregistered(hook_uuid)); + }; } ObserverCommand::ExpireStacksPredicate(HookExpirationData { hook_uuid, @@ -1430,8 +1485,23 @@ pub async fn start_observer_commands_handler( } } } + terminate(ingestion_shutdown, observer_events_tx, &ctx); Ok(()) } +fn terminate( + ingestion_shutdown: Option, + observer_events_tx: Option>, + ctx: &Context, +) { + ctx.try_log(|logger| slog::info!(logger, "Handling Termination command")); + if let Some(ingestion_shutdown) = ingestion_shutdown { + ingestion_shutdown.notify(); + } + if let Some(ref tx) = observer_events_tx { + let _ = tx.send(ObserverEvent::Info("Terminating event observer".into())); + let _ = tx.send(ObserverEvent::Terminate); + } +} #[cfg(test)] pub mod tests; diff --git a/components/chainhook-sdk/src/observer/tests/mod.rs b/components/chainhook-sdk/src/observer/tests/mod.rs index 7b678d2ce..053360e10 100644 --- a/components/chainhook-sdk/src/observer/tests/mod.rs +++ b/components/chainhook-sdk/src/observer/tests/mod.rs @@ -498,10 +498,7 @@ fn test_stacks_chainhook_register_deregister() { )); assert!(match observer_events_rx.recv() { Ok(ObserverEvent::PredicateDeregistered(deregistered_chainhook)) => { - assert_eq!( - ChainhookSpecification::Stacks(chainhook), - deregistered_chainhook - ); + assert_eq!(chainhook.uuid, deregistered_chainhook); true } _ => false, @@ -692,7 +689,7 @@ fn test_stacks_chainhook_auto_deregister() { // Should signal that a hook was deregistered assert!(match observer_events_rx.recv() { Ok(ObserverEvent::PredicateDeregistered(deregistered_hook)) => { - assert_eq!(deregistered_hook.uuid(), chainhook.uuid); + assert_eq!(deregistered_hook, chainhook.uuid); true } _ => false, @@ -858,10 +855,7 @@ fn test_bitcoin_chainhook_register_deregister() { )); assert!(match observer_events_rx.recv() { Ok(ObserverEvent::PredicateDeregistered(deregistered_chainhook)) => { - assert_eq!( - ChainhookSpecification::Bitcoin(chainhook), - deregistered_chainhook - ); + assert_eq!(chainhook.uuid, deregistered_chainhook); true } _ => false, @@ -1069,7 +1063,7 @@ fn test_bitcoin_chainhook_auto_deregister() { // Should signal that a hook was deregistered assert!(match observer_events_rx.recv() { Ok(ObserverEvent::PredicateDeregistered(deregistered_hook)) => { - assert_eq!(deregistered_hook.uuid(), chainhook.uuid); + assert_eq!(deregistered_hook, chainhook.uuid); true } _ => false, diff --git a/components/chainhook-sdk/src/utils/mod.rs b/components/chainhook-sdk/src/utils/mod.rs index 7f88e541c..541b6f352 100644 --- a/components/chainhook-sdk/src/utils/mod.rs +++ b/components/chainhook-sdk/src/utils/mod.rs @@ -164,7 +164,7 @@ pub async fn send_request( let err_msg = match request_builder.send().await { Ok(res) => { if res.status().is_success() { - ctx.try_log(|logger| slog::info!(logger, "Trigger {} successful", res.url())); + ctx.try_log(|logger| slog::debug!(logger, "Trigger {} successful", res.url())); return Ok(()); } else { retry += 1;