diff --git a/components/chainhook-cli/src/archive/mod.rs b/components/chainhook-cli/src/archive/mod.rs index 81eee7911..b41c62245 100644 --- a/components/chainhook-cli/src/archive/mod.rs +++ b/components/chainhook-cli/src/archive/mod.rs @@ -21,7 +21,7 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> { println!("{}", e.to_string()); }); - let remote_sha_url = config.expected_remote_stacks_tsv_sha256(); + let remote_sha_url = config.expected_remote_stacks_tsv_sha256()?; let res = reqwest::get(&remote_sha_url) .await .or(Err(format!("Failed to GET from '{}'", &remote_sha_url)))? @@ -34,7 +34,7 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> { write_file_content_at_path(&local_sha_file_path, &res.to_vec())?; - let file_url = config.expected_remote_stacks_tsv_url(); + let file_url = config.expected_remote_stacks_tsv_url()?; let res = reqwest::get(&file_url) .await .or(Err(format!("Failed to GET from '{}'", &file_url)))?; @@ -55,14 +55,17 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> { Ok(0) => break, Ok(n) => { if let Err(e) = file.write_all(&buffer[..n]) { - let err = - format!("unable to update compressed archive: {}", e.to_string()); - return Err(err); + return Err(format!( + "unable to update compressed archive: {}", + e.to_string() + )); } } Err(e) => { - let err = format!("unable to write compressed archive: {}", e.to_string()); - return Err(err); + return Err(format!( + "unable to write compressed archive: {}", + e.to_string() + )); } } } @@ -83,12 +86,11 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> { .map_err(|e| format!("unable to download stacks archive: {}", e.to_string()))?; } drop(tx); - tokio::task::spawn_blocking(|| decoder_thread.join()) .await - .unwrap() - .unwrap() - .unwrap(); + .map_err(|e| format!("failed to spawn thread: {e}"))? + .map_err(|e| format!("decoder thread failed when downloading tsv: {:?}", e))? + .map_err(|e| format!("failed to download tsv: {}", e))?; } Ok(()) @@ -124,11 +126,14 @@ impl Read for ChannelRead { } } -pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Context) -> bool { +pub async fn download_stacks_dataset_if_required( + config: &mut Config, + ctx: &Context, +) -> Result { if config.is_initial_ingestion_required() { // Download default tsv. if config.rely_on_remote_stacks_tsv() && config.should_download_remote_stacks_tsv() { - let url = config.expected_remote_stacks_tsv_url(); + let url = config.expected_remote_stacks_tsv_url()?; let mut tsv_file_path = config.expected_cache_path(); tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network)); let mut tsv_sha_file_path = config.expected_cache_path(); @@ -137,7 +142,7 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont // Download archive if not already present in cache // Load the local let local_sha_file = read_file_content_at_path(&tsv_sha_file_path); - let sha_url = config.expected_remote_stacks_tsv_sha256(); + let sha_url = config.expected_remote_stacks_tsv_sha256()?; let remote_sha_file = match reqwest::get(&sha_url).await { Ok(response) => response.bytes().await, @@ -164,28 +169,25 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont "Stacks archive file already up to date" ); config.add_local_stacks_tsv_source(&tsv_file_path); - return false; + return Ok(false); } info!(ctx.expect_logger(), "Downloading {}", url); match download_tsv_file(&config).await { Ok(_) => {} - Err(e) => { - error!(ctx.expect_logger(), "{}", e); - std::process::exit(1); - } + Err(e) => return Err(e), } info!(ctx.expect_logger(), "Successfully downloaded tsv file"); config.add_local_stacks_tsv_source(&tsv_file_path); } - true + Ok(true) } else { info!( ctx.expect_logger(), "Streaming blocks from stacks-node {}", config.network.get_stacks_node_config().rpc_url ); - false + Ok(false) } } diff --git a/components/chainhook-cli/src/archive/tests/mod.rs b/components/chainhook-cli/src/archive/tests/mod.rs index eb97bfff9..35250e30b 100644 --- a/components/chainhook-cli/src/archive/tests/mod.rs +++ b/components/chainhook-cli/src/archive/tests/mod.rs @@ -72,8 +72,14 @@ async fn it_downloads_stacks_dataset_if_required() { tracer: false, }; let mut config_clone = config.clone(); - assert!(download_stacks_dataset_if_required(&mut config, &ctx).await); - assert!(!download_stacks_dataset_if_required(&mut config_clone, &ctx).await); + assert!(download_stacks_dataset_if_required(&mut config, &ctx) + .await + .unwrap()); + assert!( + !download_stacks_dataset_if_required(&mut config_clone, &ctx) + .await + .unwrap() + ); let mut tsv_file_path = config.expected_cache_path(); tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network)); diff --git a/components/chainhook-cli/src/cli/mod.rs b/components/chainhook-cli/src/cli/mod.rs index ed51d0b1a..d92f37bc7 100644 --- a/components/chainhook-cli/src/cli/mod.rs +++ b/components/chainhook-cli/src/cli/mod.rs @@ -277,14 +277,14 @@ pub fn main() { let opts: Opts = match Opts::try_parse() { Ok(opts) => opts, Err(e) => { - error!(ctx.expect_logger(), "{e}"); + crit!(ctx.expect_logger(), "{e}"); process::exit(1); } }; match hiro_system_kit::nestable_block_on(handle_command(opts, ctx.clone())) { Err(e) => { - error!(ctx.expect_logger(), "{e}"); + crit!(ctx.expect_logger(), "{e}"); process::exit(1); } Ok(_) => {} diff --git a/components/chainhook-cli/src/config/mod.rs b/components/chainhook-cli/src/config/mod.rs index fc291659f..812d76021 100644 --- a/components/chainhook-cli/src/config/mod.rs +++ b/components/chainhook-cli/src/config/mod.rs @@ -256,13 +256,13 @@ impl Config { } } - pub fn expected_local_stacks_tsv_file(&self) -> &PathBuf { + pub fn expected_local_stacks_tsv_file(&self) -> Result<&PathBuf, String> { for source in self.event_sources.iter() { if let EventSourceConfig::StacksTsvPath(config) = source { - return &config.file_path; + return Ok(&config.file_path); } } - panic!("expected local-tsv source") + Err("could not find expected local tsv source")? } pub fn expected_cache_path(&self) -> PathBuf { @@ -271,21 +271,23 @@ impl Config { destination_path } - fn expected_remote_stacks_tsv_base_url(&self) -> &String { + fn expected_remote_stacks_tsv_base_url(&self) -> Result<&String, String> { for source in self.event_sources.iter() { if let EventSourceConfig::StacksTsvUrl(config) = source { - return &config.file_url; + return Ok(&config.file_url); } } - panic!("expected remote-tsv source") + Err("could not find expected remote tsv source")? } - pub fn expected_remote_stacks_tsv_sha256(&self) -> String { - format!("{}.sha256", self.expected_remote_stacks_tsv_base_url()) + pub fn expected_remote_stacks_tsv_sha256(&self) -> Result { + self.expected_remote_stacks_tsv_base_url() + .map(|url| format!("{}.sha256", url)) } - pub fn expected_remote_stacks_tsv_url(&self) -> String { - format!("{}.gz", self.expected_remote_stacks_tsv_base_url()) + pub fn expected_remote_stacks_tsv_url(&self) -> Result { + self.expected_remote_stacks_tsv_base_url() + .map(|url| format!("{}.gz", url)) } pub fn rely_on_remote_stacks_tsv(&self) -> bool { diff --git a/components/chainhook-cli/src/config/tests/mod.rs b/components/chainhook-cli/src/config/tests/mod.rs index 4adf04a8a..ddae000c2 100644 --- a/components/chainhook-cli/src/config/tests/mod.rs +++ b/components/chainhook-cli/src/config/tests/mod.rs @@ -108,7 +108,6 @@ fn should_download_remote_stacks_tsv_handles_both_modes() { } #[test] -#[should_panic(expected = "expected remote-tsv source")] fn expected_remote_stacks_tsv_base_url_panics_if_missing() { let url_src = EventSourceConfig::StacksTsvUrl(super::UrlConfig { file_url: format!("test"), @@ -116,15 +115,22 @@ fn expected_remote_stacks_tsv_base_url_panics_if_missing() { let mut config = Config::default(true, false, false, &None).unwrap(); config.event_sources = vec![url_src.clone()]; - assert_eq!(config.expected_remote_stacks_tsv_base_url(), "test"); + match config.expected_remote_stacks_tsv_base_url() { + Ok(tsv_url) => assert_eq!(tsv_url, "test"), + Err(e) => { + panic!("expected tsv file: {e}") + } + } config.event_sources = vec![]; - config.expected_remote_stacks_tsv_base_url(); + match config.expected_remote_stacks_tsv_base_url() { + Ok(tsv_url) => panic!("expected no tsv file, found {}", tsv_url), + Err(e) => assert_eq!(e, "could not find expected remote tsv source".to_string()), + }; } #[test] -#[should_panic(expected = "expected local-tsv source")] -fn expected_local_stacks_tsv_base_url_panics_if_missing() { +fn expected_local_stacks_tsv_base_url_errors_if_missing() { let path = PathBuf::from("test"); let path_src = EventSourceConfig::StacksTsvPath(PathConfig { file_path: path.clone(), @@ -132,10 +138,18 @@ fn expected_local_stacks_tsv_base_url_panics_if_missing() { let mut config = Config::default(true, false, false, &None).unwrap(); config.event_sources = vec![path_src.clone()]; - assert_eq!(config.expected_local_stacks_tsv_file(), &path); + match config.expected_local_stacks_tsv_file() { + Ok(tsv_path) => assert_eq!(tsv_path, &path), + Err(e) => { + panic!("expected tsv file: {e}") + } + } config.event_sources = vec![]; - config.expected_local_stacks_tsv_file(); + match config.expected_local_stacks_tsv_file() { + Ok(tsv_path) => panic!("expected no tsv file, found {}", tsv_path.to_string_lossy()), + Err(e) => assert_eq!(e, "could not find expected local tsv source".to_string()), + }; } #[test] diff --git a/components/chainhook-cli/src/scan/bitcoin.rs b/components/chainhook-cli/src/scan/bitcoin.rs index 41ae6e365..886f60998 100644 --- a/components/chainhook-cli/src/scan/bitcoin.rs +++ b/components/chainhook-cli/src/scan/bitcoin.rs @@ -29,6 +29,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( config: &Config, ctx: &Context, ) -> Result { + let predicate_uuid = &predicate_spec.uuid; let auth = Auth::UserPass( config.network.bitcoind_rpc_username.clone(), config.network.bitcoind_rpc_password.clone(), @@ -71,9 +72,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( PredicatesApi::Off => None, }; - info!( + debug!( ctx.expect_logger(), - "Starting predicate evaluation on Bitcoin blocks", + "Starting predicate evaluation on Bitcoin blocks for predicate {predicate_uuid}", ); let mut last_block_scanned = BlockIdentifier::default(); @@ -200,7 +201,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( info!( ctx.expect_logger(), - "{number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered" + "Predicate {predicate_uuid} scan completed. {number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered." ); if let Some(ref mut predicates_db_conn) = predicates_db_conn { @@ -269,9 +270,13 @@ pub async fn execute_predicates_action<'a>( if trigger.chainhook.include_proof { gather_proofs(&trigger, &mut proofs, &config, &ctx); } + let predicate_uuid = &trigger.chainhook.uuid; match handle_bitcoin_hook_action(trigger, &proofs) { Err(e) => { - error!(ctx.expect_logger(), "unable to handle action {}", e); + warn!( + ctx.expect_logger(), + "unable to handle action for predicate {}: {}", predicate_uuid, e + ); } Ok(action) => { actions_triggered += 1; diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index 6d3f90e40..6b50ba419 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -66,7 +66,7 @@ pub async fn get_canonical_fork_from_tsv( start_block: Option, ctx: &Context, ) -> Result, String> { - let seed_tsv_path = config.expected_local_stacks_tsv_file().clone(); + let seed_tsv_path = config.expected_local_stacks_tsv_file()?.clone(); let (record_tx, record_rx) = std::sync::mpsc::channel(); @@ -98,7 +98,7 @@ pub async fn get_canonical_fork_from_tsv( } let _ = record_tx.send(None); }) - .expect("unable to spawn thread"); + .map_err(|e| format!("unable to spawn thread: {e}"))?; let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?; let canonical_fork = { @@ -111,7 +111,10 @@ pub async fn get_canonical_fork_from_tsv( match standardize_stacks_serialized_block_header(&blob) { Ok(data) => data, Err(e) => { - error!(ctx.expect_logger(), "{e}"); + error!( + ctx.expect_logger(), + "Failed to standardize stacks header: {e}" + ); continue; } } @@ -169,12 +172,13 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( config: &Config, ctx: &Context, ) -> Result<(Option, bool), String> { + let predicate_uuid = &predicate_spec.uuid; let mut chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) { Some(chain_tip) => chain_tip, None => match get_last_block_height_inserted(stacks_db_conn, ctx) { Some(chain_tip) => chain_tip, None => { - info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_spec.uuid); + info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_uuid); return Ok((None, false)); } }, @@ -201,9 +205,9 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( }; let proofs = HashMap::new(); - info!( + debug!( ctx.expect_logger(), - "Starting predicate evaluation on Stacks blocks" + "Starting predicate evaluation on Stacks blocks for predicate {}", predicate_uuid ); let mut last_block_scanned = BlockIdentifier::default(); let mut err_count = 0; @@ -243,7 +247,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( None => match get_last_block_height_inserted(stacks_db_conn, ctx) { Some(chain_tip) => chain_tip, None => { - info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_spec.uuid); + warn!(ctx.expect_logger(), "No blocks inserted in db; cannot determine Stacks chain tip. Skipping scan of predicate {}", predicate_uuid); return Ok((None, false)); } }, @@ -304,7 +308,10 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( }; let res = match handle_stacks_hook_action(trigger, &proofs, &ctx) { Err(e) => { - error!(ctx.expect_logger(), "unable to handle action {}", e); + warn!( + ctx.expect_logger(), + "unable to handle action for predicate {}: {}", predicate_uuid, e + ); Ok(()) // todo: should this error increment our err_count? } Ok(action) => { @@ -342,7 +349,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( } info!( ctx.expect_logger(), - "{number_of_blocks_scanned} blocks scanned, {number_of_times_triggered} blocks triggering predicate" + "Predicate {predicate_uuid} scan completed. {number_of_blocks_scanned} blocks scanned, {number_of_times_triggered} blocks triggering predicate.", ); if let Some(ref mut predicates_db_conn) = predicates_db_conn { @@ -420,7 +427,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate( } } - let _ = download_stacks_dataset_if_required(config, ctx).await; + let _ = download_stacks_dataset_if_required(config, ctx).await?; let mut canonical_fork = get_canonical_fork_from_tsv(config, None, ctx).await?; @@ -516,7 +523,7 @@ pub async fn consolidate_local_stacks_chainstate_using_csv( "Building local chainstate from Stacks archive file" ); - let downloaded_new_dataset = download_stacks_dataset_if_required(config, ctx).await; + let downloaded_new_dataset = download_stacks_dataset_if_required(config, ctx).await?; if downloaded_new_dataset { let stacks_db = @@ -550,13 +557,15 @@ pub async fn consolidate_local_stacks_chainstate_using_csv( ) { Ok(block) => block, Err(e) => { - error!(&ctx.expect_logger(), "{e}"); + error!( + &ctx.expect_logger(), + "Failed to standardize stacks block: {e}" + ); continue; } }; - // TODO: return a result - insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx); + insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx)?; if blocks_inserted % 2500 == 0 { info!( diff --git a/components/chainhook-cli/src/service/http_api.rs b/components/chainhook-cli/src/service/http_api.rs index 191a33a7c..460b2f9a1 100644 --- a/components/chainhook-cli/src/service/http_api.rs +++ b/components/chainhook-cli/src/service/http_api.rs @@ -28,7 +28,7 @@ pub async fn start_predicate_api_server( api_config: PredicatesApiConfig, observer_commands_tx: Sender, ctx: Context, -) -> Result> { +) -> Result> { let log_level = LogLevel::Off; let mut shutdown_config = config::Shutdown::default(); @@ -62,12 +62,12 @@ pub async fn start_predicate_api_server( .ignite() .await?; - let ingestion_shutdown = ignite.shutdown(); + let predicate_api_shutdown = ignite.shutdown(); let _ = std::thread::spawn(move || { let _ = hiro_system_kit::nestable_block_on(ignite.launch()); }); - Ok(ingestion_shutdown) + Ok(predicate_api_shutdown) } #[openapi(tag = "Health Check")] @@ -298,9 +298,12 @@ pub fn get_entry_from_predicates_db( let spec = ChainhookSpecification::deserialize_specification(&encoded_spec)?; let encoded_status = match entry.get("status") { - None => unimplemented!(), - Some(payload) => payload, - }; + None => Err(format!( + "found predicate specification with no status for predicate {}", + predicate_key + )), + Some(payload) => Ok(payload), + }?; let status = serde_json::from_str(&encoded_status).map_err(|e| format!("{}", e.to_string()))?; diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index 86286c356..9bb7e95e5 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -89,15 +89,16 @@ impl Service { } match chainhook_config.register_specification(predicate) { Ok(_) => { - info!( + debug!( self.ctx.expect_logger(), - "Predicate {} retrieved from storage and loaded", predicate_uuid, + "Predicate {} retrieved from storage and registered", predicate_uuid, ); } Err(e) => { - error!( + warn!( self.ctx.expect_logger(), - "Failed loading predicate from storage: {}", + "Failed to register predicate {} after retrieving from storage: {}", + predicate_uuid, e.to_string() ); } @@ -117,7 +118,7 @@ impl Service { &self.ctx, ) { Ok(Some(_)) => { - error!( + warn!( self.ctx.expect_logger(), "Predicate uuid already in use: {uuid}", ); @@ -136,16 +137,16 @@ impl Service { ) { Ok(spec) => { newly_registered_predicates.push(spec.clone()); - info!( + debug!( self.ctx.expect_logger(), "Predicate {} retrieved from config and loaded", spec.uuid(), ); } Err(e) => { - error!( + warn!( self.ctx.expect_logger(), - "Failed loading predicate from config: {}", + "Failed to load predicate from config: {}", e.to_string() ); } @@ -163,7 +164,7 @@ impl Service { // Download and ingest a Stacks dump if self.config.rely_on_remote_stacks_tsv() { let _ = - consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await; + consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await?; } // Stacks scan operation threadpool @@ -176,9 +177,12 @@ impl Service { start_stacks_scan_runloop( &config, stacks_scan_op_rx, - observer_command_tx_moved, + observer_command_tx_moved.clone(), &ctx, ); + // the scan runloop should loop forever; if it finishes, something is wrong + crit!(ctx.expect_logger(), "Stacks scan runloop stopped.",); + let _ = observer_command_tx_moved.send(ObserverCommand::Terminate); }) .expect("unable to spawn thread"); @@ -192,15 +196,18 @@ impl Service { start_bitcoin_scan_runloop( &config, bitcoin_scan_op_rx, - observer_command_tx_moved, + observer_command_tx_moved.clone(), &ctx, ); + // the scan runloop should loop forever; if it finishes, something is wrong + crit!(ctx.expect_logger(), "Bitcoin scan runloop stopped.",); + let _ = observer_command_tx_moved.send(ObserverCommand::Terminate); }) .expect("unable to spawn thread"); // Enable HTTP Predicates API, if required let config = self.config.clone(); - if let PredicatesApi::On(ref api_config) = config.http_api { + let predicate_api_shutdown = if let PredicatesApi::On(ref api_config) = config.http_api { info!( self.ctx.expect_logger(), "Listening on port {} for chainhook predicate registrations", api_config.http_port @@ -209,11 +216,29 @@ impl Service { let api_config = api_config.clone(); let moved_observer_command_tx = observer_command_tx.clone(); // Test and initialize a database connection - let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || { - let future = start_predicate_api_server(api_config, moved_observer_command_tx, ctx); - let _ = hiro_system_kit::nestable_block_on(future); - }); - } + let res = hiro_system_kit::thread_named("HTTP Predicate API") + .spawn(move || { + let future = start_predicate_api_server( + api_config, + moved_observer_command_tx.clone(), + ctx.clone(), + ); + hiro_system_kit::nestable_block_on(future) + }) + .expect("unable to spawn thread"); + let res = res.join().expect("unable to terminate thread"); + match res { + Ok(predicate_api_shutdown) => Some(predicate_api_shutdown), + Err(e) => { + return Err(format!( + "Predicate API Registration server failed to start: {}", + e + )); + } + } + } else { + None + }; let ctx = self.ctx.clone(); let stacks_db = @@ -281,7 +306,7 @@ impl Service { let event = match observer_event_rx.recv() { Ok(cmd) => cmd, Err(e) => { - error!( + crit!( self.ctx.expect_logger(), "Error: broken channel {}", e.to_string() @@ -343,20 +368,20 @@ impl Service { ); } } - ObserverEvent::PredicateDeregistered(spec) => { + ObserverEvent::PredicateDeregistered(uuid) => { if let PredicatesApi::On(ref config) = self.config.http_api { let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else { continue; }; - let predicate_key = spec.key(); + let predicate_key = ChainhookSpecification::either_stx_or_btc_key(&uuid); let res: Result<(), redis::RedisError> = - predicates_db_conn.del(predicate_key); + predicates_db_conn.del(predicate_key.clone()); if let Err(e) = res { - error!( + warn!( self.ctx.expect_logger(), - "unable to delete predicate: {}", + "unable to delete predicate {predicate_key}: {}", e.to_string() ); } @@ -429,7 +454,7 @@ impl Service { } } } - update_stats_from_report( + update_status_from_report( Chain::Bitcoin, report, &mut predicates_db_conn, @@ -446,7 +471,7 @@ impl Service { Err(e) => { error!( self.ctx.expect_logger(), - "unable to store stacks block: {}", + "unable to open stacks db: {}", e.to_string() ); continue; @@ -455,28 +480,48 @@ impl Service { match &chain_event { StacksChainEvent::ChainUpdatedWithBlocks(data) => { - confirm_entries_in_stacks_blocks( + if let Err(e) = confirm_entries_in_stacks_blocks( &data.confirmed_blocks, &stacks_db_conn_rw, &self.ctx, - ); - draft_entries_in_stacks_blocks( + ) { + error!( + self.ctx.expect_logger(), + "unable add confirmed entries to stacks db: {}", e + ); + }; + if let Err(e) = draft_entries_in_stacks_blocks( &data.new_blocks, &stacks_db_conn_rw, &self.ctx, - ) + ) { + error!( + self.ctx.expect_logger(), + "unable add unconfirmed entries to stacks db: {}", e + ); + }; } StacksChainEvent::ChainUpdatedWithReorg(data) => { - confirm_entries_in_stacks_blocks( + if let Err(e) = confirm_entries_in_stacks_blocks( &data.confirmed_blocks, &stacks_db_conn_rw, &self.ctx, - ); - draft_entries_in_stacks_blocks( + ) { + error!( + self.ctx.expect_logger(), + "unable add confirmed entries to stacks db: {}", e + ); + }; + if let Err(e) = draft_entries_in_stacks_blocks( &data.blocks_to_apply, &stacks_db_conn_rw, &self.ctx, - ) + ) { + error!( + self.ctx.expect_logger(), + "unable add unconfirmed entries to stacks db: {}", e + ); + }; } StacksChainEvent::ChainUpdatedWithMicroblocks(_) | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} @@ -546,7 +591,7 @@ impl Service { StacksChainEvent::ChainUpdatedWithMicroblocks(_) | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} }; - update_stats_from_report( + update_status_from_report( Chain::Stacks, report, &mut predicates_db_conn, @@ -557,15 +602,36 @@ impl Service { // Every 32 blocks, we will check if there's a new Stacks file archive to ingest if stacks_event > 32 { stacks_event = 0; - let _ = consolidate_local_stacks_chainstate_using_csv( - &mut self.config, - &self.ctx, - ) - .await; + if self.config.rely_on_remote_stacks_tsv() { + match consolidate_local_stacks_chainstate_using_csv( + &mut self.config, + &self.ctx, + ) + .await + { + Err(e) => { + error!( + self.ctx.expect_logger(), + "Failed to update database from archive: {e}" + ) + } + Ok(()) => {} + }; + } } } ObserverEvent::Terminate => { - info!(self.ctx.expect_logger(), "Terminating runloop"); + info!( + self.ctx.expect_logger(), + "Terminating ObserverEvent runloop" + ); + if let Some(predicate_api_shutdown) = predicate_api_shutdown { + info!( + self.ctx.expect_logger(), + "Terminating Predicate Registration API" + ); + predicate_api_shutdown.notify(); + } break; } _ => {} @@ -615,7 +681,7 @@ pub struct ExpiredData { pub expired_at_block_height: u64, } -fn update_stats_from_report( +fn update_status_from_report( chain: Chain, report: PredicateEvaluationReport, predicates_db_conn: &mut Connection, @@ -623,7 +689,7 @@ fn update_stats_from_report( ) { for (predicate_uuid, blocks_ids) in report.predicates_triggered.iter() { if let Some(last_triggered_height) = blocks_ids.last().and_then(|b| Some(b.index)) { - let triggered_count = blocks_ids.len().try_into().unwrap(); + let triggered_count = blocks_ids.len().try_into().unwrap_or(0); set_predicate_streaming_status( StreamingDataType::Occurrence { last_triggered_height, @@ -652,7 +718,7 @@ fn update_stats_from_report( } } if let Some(last_evaluated_height) = blocks_ids.last().and_then(|b| Some(b.index)) { - let evaluated_count = blocks_ids.len().try_into().unwrap(); + let evaluated_count = blocks_ids.len().try_into().unwrap_or(0); set_predicate_streaming_status( StreamingDataType::Evaluation { last_evaluated_height, @@ -666,7 +732,7 @@ fn update_stats_from_report( } for (predicate_uuid, blocks_ids) in report.predicates_expired.iter() { if let Some(last_evaluated_height) = blocks_ids.last().and_then(|b| Some(b.index)) { - let evaluated_count = blocks_ids.len().try_into().unwrap(); + let evaluated_count = blocks_ids.len().try_into().unwrap_or(0); set_unconfirmed_expiration_status( &chain, evaluated_count, @@ -1035,13 +1101,13 @@ fn insert_predicate_expiration( if let Err(e) = predicates_db_conn.hset::<_, _, _, ()>(&key, "predicates", &serialized_expiring_predicates) { - error!( + warn!( ctx.expect_logger(), "Error updating expired predicates index: {}", e.to_string() ); } else { - info!( + debug!( ctx.expect_logger(), "Updating expired predicates at block height {expired_at_block_height} with predicate: {predicate_key}" ); @@ -1060,7 +1126,7 @@ fn get_predicates_expiring_at_block( Ok(data) => { if let Err(e) = predicates_db_conn.hdel::<_, _, u64>(key.to_string(), "predicates") { - error!( + warn!( ctx.expect_logger(), "Error removing expired predicates index: {}", e.to_string() @@ -1084,13 +1150,14 @@ pub fn update_predicate_status( if let Err(e) = predicates_db_conn.hset::<_, _, _, ()>(&predicate_key, "status", &serialized_status) { - error!( + warn!( ctx.expect_logger(), - "Error updating status: {}", + "Error updating status for {}: {}", + predicate_key, e.to_string() ); } else { - info!( + debug!( ctx.expect_logger(), "Updating predicate {predicate_key} status: {serialized_status}" ); @@ -1107,13 +1174,14 @@ fn update_predicate_spec( if let Err(e) = predicates_db_conn.hset::<_, _, _, ()>(&predicate_key, "specification", &serialized_spec) { - error!( + warn!( ctx.expect_logger(), - "Error updating status: {}", + "Error updating status for {}: {}", + predicate_key, e.to_string() ); } else { - info!( + debug!( ctx.expect_logger(), "Updating predicate {predicate_key} with spec: {serialized_spec}" ); @@ -1154,6 +1222,7 @@ pub fn open_readwrite_predicates_db_conn_verbose( res } +// todo: evaluate expects pub fn open_readwrite_predicates_db_conn_or_panic( config: &PredicatesApiConfig, ctx: &Context, diff --git a/components/chainhook-cli/src/service/runloops.rs b/components/chainhook-cli/src/service/runloops.rs index ee441008a..d1c4d32cc 100644 --- a/components/chainhook-cli/src/service/runloops.rs +++ b/components/chainhook-cli/src/service/runloops.rs @@ -43,9 +43,12 @@ pub fn start_stacks_scan_runloop( { Ok(db_conn) => db_conn, Err(e) => { + // todo: if we repeatedly can't connect to the database, we should restart the + // service to get to a healthy state. I don't know if this has been an issue, though + // so we can monitor and possibly remove this todo error!( moved_ctx.expect_logger(), - "unable to store stacks block: {}", + "unable to open stacks db: {}", e.to_string() ); unimplemented!() @@ -108,8 +111,7 @@ pub fn start_stacks_scan_runloop( } }); } - let res = stacks_scan_pool.join(); - res + let _ = stacks_scan_pool.join(); } pub fn start_bitcoin_scan_runloop( @@ -138,7 +140,7 @@ pub fn start_bitcoin_scan_runloop( let predicate_is_expired = match hiro_system_kit::nestable_block_on(op) { Ok(predicate_is_expired) => predicate_is_expired, Err(e) => { - error!( + warn!( moved_ctx.expect_logger(), "Unable to evaluate predicate on Bitcoin chainstate: {e}", ); diff --git a/components/chainhook-cli/src/storage/mod.rs b/components/chainhook-cli/src/storage/mod.rs index 2d18aa33b..6c2d04ca3 100644 --- a/components/chainhook-cli/src/storage/mod.rs +++ b/components/chainhook-cli/src/storage/mod.rs @@ -116,12 +116,16 @@ fn get_last_unconfirmed_insert_key() -> [u8; 3] { *LAST_UNCONFIRMED_KEY_PREFIX } -pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB, _ctx: &Context) { +pub fn insert_entry_in_stacks_blocks( + block: &StacksBlockData, + stacks_db_rw: &DB, + _ctx: &Context, +) -> Result<(), String> { let key = get_block_key(&block.block_identifier); let block_bytes = json!(block); stacks_db_rw .put(&key, &block_bytes.to_string().as_bytes()) - .expect("unable to insert blocks"); + .map_err(|e| format!("unable to insert blocks: {}", e))?; let previous_last_inserted = get_last_block_height_inserted(stacks_db_rw, _ctx).unwrap_or(0); if block.block_identifier.index > previous_last_inserted { stacks_db_rw @@ -129,35 +133,39 @@ pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB, get_last_confirmed_insert_key(), block.block_identifier.index.to_be_bytes(), ) - .expect("unable to insert metadata"); + .map_err(|e| format!("unable to insert metadata: {}", e))?; } + Ok(()) } pub fn insert_unconfirmed_entry_in_stacks_blocks( block: &StacksBlockData, stacks_db_rw: &DB, _ctx: &Context, -) { +) -> Result<(), String> { let key = get_unconfirmed_block_key(&block.block_identifier); let block_bytes = json!(block); stacks_db_rw .put(&key, &block_bytes.to_string().as_bytes()) - .expect("unable to insert blocks"); + .map_err(|e| format!("unable to insert blocks: {}", e))?; stacks_db_rw .put( get_last_unconfirmed_insert_key(), block.block_identifier.index.to_be_bytes(), ) - .expect("unable to insert metadata"); + .map_err(|e| format!("unable to insert metadata: {}", e))?; + Ok(()) } pub fn delete_unconfirmed_entry_from_stacks_blocks( block_identifier: &BlockIdentifier, stacks_db_rw: &DB, _ctx: &Context, -) { +) -> Result<(), String> { let key = get_unconfirmed_block_key(&block_identifier); - stacks_db_rw.delete(&key).expect("unable to delete blocks"); + stacks_db_rw + .delete(&key) + .map_err(|e| format!("unable to delete blocks: {}", e)) } pub fn get_last_unconfirmed_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option { @@ -209,22 +217,24 @@ pub fn confirm_entries_in_stacks_blocks( blocks: &Vec, stacks_db_rw: &DB, ctx: &Context, -) { +) -> Result<(), String> { for block in blocks.iter() { - insert_entry_in_stacks_blocks(block, stacks_db_rw, ctx); - delete_unconfirmed_entry_from_stacks_blocks(&block.block_identifier, stacks_db_rw, ctx); + insert_entry_in_stacks_blocks(block, stacks_db_rw, ctx)?; + delete_unconfirmed_entry_from_stacks_blocks(&block.block_identifier, stacks_db_rw, ctx)?; } + Ok(()) } pub fn draft_entries_in_stacks_blocks( block_updates: &Vec, stacks_db_rw: &DB, ctx: &Context, -) { +) -> Result<(), String> { for update in block_updates.iter() { // TODO: Could be imperfect, from a microblock point of view - insert_unconfirmed_entry_in_stacks_blocks(&update.block, stacks_db_rw, ctx); + insert_unconfirmed_entry_in_stacks_blocks(&update.block, stacks_db_rw, ctx)?; } + Ok(()) } pub fn get_stacks_block_at_block_height( diff --git a/components/chainhook-sdk/Cargo.toml b/components/chainhook-sdk/Cargo.toml index d85a8dcd5..b8789ae2f 100644 --- a/components/chainhook-sdk/Cargo.toml +++ b/components/chainhook-sdk/Cargo.toml @@ -50,4 +50,4 @@ test-case = "3.1.0" default = ["hiro-system-kit/log"] zeromq = ["zmq"] debug = ["hiro-system-kit/debug"] -release = ["hiro-system-kit/release"] +release = ["hiro-system-kit/debug"] diff --git a/components/chainhook-sdk/src/indexer/fork_scratch_pad.rs b/components/chainhook-sdk/src/indexer/fork_scratch_pad.rs index 251913a06..4bd584e1f 100644 --- a/components/chainhook-sdk/src/indexer/fork_scratch_pad.rs +++ b/components/chainhook-sdk/src/indexer/fork_scratch_pad.rs @@ -293,7 +293,7 @@ impl ForkScratchPad { ctx.try_log(|logger| { slog::error!( logger, - "unable to retrive Bitcoin {} from block store", + "unable to retrieve Bitcoin block {} from block store", block_identifier ) }); @@ -317,7 +317,16 @@ impl ForkScratchPad { let block_identifier = &divergence.block_ids_to_apply[i]; let header = match self.headers_store.get(block_identifier) { Some(header) => header.clone(), - None => panic!("unable to retrive block from block store"), + None => { + ctx.try_log(|logger| { + slog::error!( + logger, + "unable to retrieve Bitcoin block {} from block store", + block_identifier + ) + }); + return Err(ChainSegmentIncompatibility::Unknown); + } }; new_headers.push(header) } @@ -336,22 +345,40 @@ impl ForkScratchPad { .map(|block_id| { let block = match self.headers_store.get(block_id) { Some(block) => block.clone(), - None => panic!("unable to retrive block from block store"), + None => { + ctx.try_log(|logger| { + slog::error!( + logger, + "unable to retrieve Bitcoin block {} from block store", + block_id + ) + }); + return Err(ChainSegmentIncompatibility::Unknown); + } }; - block + Ok(block) }) - .collect::>(), + .collect::, _>>()?, headers_to_apply: divergence .block_ids_to_apply .iter() .map(|block_id| { let block = match self.headers_store.get(block_id) { Some(block) => block.clone(), - None => panic!("unable to retrive block from block store"), + None => { + ctx.try_log(|logger| { + slog::error!( + logger, + "unable to retrieve Bitcoin block {} from block store", + block_id + ) + }); + return Err(ChainSegmentIncompatibility::Unknown); + } }; - block + Ok(block) }) - .collect::>(), + .collect::, _>>()?, confirmed_headers: vec![], }, )); diff --git a/components/chainhook-sdk/src/indexer/mod.rs b/components/chainhook-sdk/src/indexer/mod.rs index 9b0c97e92..a518aaaa6 100644 --- a/components/chainhook-sdk/src/indexer/mod.rs +++ b/components/chainhook-sdk/src/indexer/mod.rs @@ -367,7 +367,7 @@ impl ChainSegment { } Err(incompatibility) => { ctx.try_log(|logger| { - slog::info!(logger, "Will have to fork: {:?}", incompatibility) + slog::warn!(logger, "Will have to fork: {:?}", incompatibility) }); match incompatibility { ChainSegmentIncompatibility::BlockCollision => { diff --git a/components/chainhook-sdk/src/indexer/stacks/mod.rs b/components/chainhook-sdk/src/indexer/stacks/mod.rs index 4cff47b7f..0eeb70128 100644 --- a/components/chainhook-sdk/src/indexer/stacks/mod.rs +++ b/components/chainhook-sdk/src/indexer/stacks/mod.rs @@ -290,9 +290,11 @@ pub fn standardize_stacks_serialized_block_header( .parent_index_block_hash .take() .ok_or(format!("unable to retrieve parent_index_block_hash"))?; + + let parent_height = block_identifier.index.saturating_sub(1); let parent_block_identifier = BlockIdentifier { hash: parent_hash, - index: block_identifier.index - 1, + index: parent_height, }; Ok((block_identifier, parent_block_identifier)) } @@ -902,7 +904,7 @@ pub fn get_standardized_non_fungible_currency_from_asset_class_id( }), } } - +//todo: this function has a lot of expects/panics. should return result instead pub fn get_standardized_stacks_receipt( _txid: &str, events: Vec, diff --git a/components/chainhook-sdk/src/observer/http.rs b/components/chainhook-sdk/src/observer/http.rs index 25da770d9..f844718d5 100644 --- a/components/chainhook-sdk/src/observer/http.rs +++ b/components/chainhook-sdk/src/observer/http.rs @@ -132,7 +132,7 @@ pub async fn handle_new_bitcoin_block( }; } Ok(None) => { - ctx.try_log(|logger| slog::info!(logger, "unable to infer chain progress")); + ctx.try_log(|logger| slog::warn!(logger, "unable to infer chain progress")); } Err(e) => { ctx.try_log(|logger| slog::error!(logger, "unable to handle bitcoin block: {}", e)) @@ -202,7 +202,7 @@ pub fn handle_new_stacks_block( }; } Ok(None) => { - ctx.try_log(|logger| slog::info!(logger, "unable to infer chain progress")); + ctx.try_log(|logger| slog::warn!(logger, "unable to infer chain progress")); } Err(e) => ctx.try_log(|logger| slog::error!(logger, "{}", e)), } @@ -273,7 +273,7 @@ pub fn handle_new_microblocks( }; } Ok(None) => { - ctx.try_log(|logger| slog::info!(logger, "unable to infer chain progress")); + ctx.try_log(|logger| slog::warn!(logger, "unable to infer chain progress")); } Err(e) => { ctx.try_log(|logger| slog::error!(logger, "unable to handle stacks microblock: {}", e)); diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index cc1108b68..4e04e62a7 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -304,7 +304,7 @@ pub enum ObserverEvent { StacksChainEvent((StacksChainEvent, PredicateEvaluationReport)), NotifyBitcoinTransactionProxied, PredicateRegistered(ChainhookSpecification), - PredicateDeregistered(ChainhookSpecification), + PredicateDeregistered(String), PredicateEnabled(ChainhookSpecification), BitcoinPredicateTriggered(BitcoinChainhookOccurrencePayload), StacksPredicateTriggered(StacksChainhookOccurrencePayload), @@ -437,35 +437,65 @@ pub fn start_event_observer( let context_cloned = ctx.clone(); let event_observer_config_moved = config.clone(); let observer_commands_tx_moved = observer_commands_tx.clone(); - let _ = hiro_system_kit::thread_named("Chainhook event observer").spawn(move || { - let future = start_bitcoin_event_observer( - event_observer_config_moved, - observer_commands_tx_moved, - observer_commands_rx, - observer_events_tx, - observer_sidecar, - context_cloned, - ); - let _ = hiro_system_kit::nestable_block_on(future); - }); + let _ = hiro_system_kit::thread_named("Chainhook event observer") + .spawn(move || { + let future = start_bitcoin_event_observer( + event_observer_config_moved, + observer_commands_tx_moved, + observer_commands_rx, + observer_events_tx.clone(), + observer_sidecar, + context_cloned.clone(), + ); + match hiro_system_kit::nestable_block_on(future) { + Ok(_) => {} + Err(e) => { + if let Some(tx) = observer_events_tx { + context_cloned.try_log(|logger| { + slog::crit!( + logger, + "Chainhook event observer thread failed with error: {e}", + ) + }); + let _ = tx.send(ObserverEvent::Terminate); + } + } + } + }) + .expect("unable to spawn thread"); } BitcoinBlockSignaling::Stacks(ref _url) => { // Start chainhook event observer let context_cloned = ctx.clone(); let event_observer_config_moved = config.clone(); let observer_commands_tx_moved = observer_commands_tx.clone(); - let _ = hiro_system_kit::thread_named("Chainhook event observer").spawn(move || { - let future = start_stacks_event_observer( - event_observer_config_moved, - observer_commands_tx_moved, - observer_commands_rx, - observer_events_tx, - observer_sidecar, - stacks_block_pool_seed, - context_cloned, - ); - let _ = hiro_system_kit::nestable_block_on(future); - }); + let _ = hiro_system_kit::thread_named("Chainhook event observer") + .spawn(move || { + let future = start_stacks_event_observer( + event_observer_config_moved, + observer_commands_tx_moved, + observer_commands_rx, + observer_events_tx.clone(), + observer_sidecar, + stacks_block_pool_seed, + context_cloned.clone(), + ); + match hiro_system_kit::nestable_block_on(future) { + Ok(_) => {} + Err(e) => { + if let Some(tx) = observer_events_tx { + context_cloned.try_log(|logger| { + slog::crit!( + logger, + "Chainhook event observer thread failed with error: {e}", + ) + }); + let _ = tx.send(ObserverEvent::Terminate); + } + } + } + }) + .expect("unable to spawn thread"); ctx.try_log(|logger| { slog::info!( @@ -707,7 +737,7 @@ pub fn gather_proofs<'a>( for transaction in transactions.iter() { if !proofs.contains_key(&transaction.transaction_identifier) { ctx.try_log(|logger| { - slog::info!( + slog::debug!( logger, "Collecting proof for transaction {}", transaction.transaction_identifier.hash @@ -722,7 +752,7 @@ pub fn gather_proofs<'a>( proofs.insert(&transaction.transaction_identifier, proof); } Err(e) => { - ctx.try_log(|logger| slog::error!(logger, "{e}")); + ctx.try_log(|logger| slog::warn!(logger, "{e}")); } } } @@ -758,38 +788,36 @@ pub async fn start_observer_commands_handler( let command = match observer_commands_rx.recv() { Ok(cmd) => cmd, Err(e) => { - if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::Error(format!("Channel error: {:?}", e))); - } - continue; + ctx.try_log(|logger| { + slog::crit!(logger, "Error: broken channel {}", e.to_string()) + }); + break; } }; match command { ObserverCommand::Terminate => { - ctx.try_log(|logger| slog::info!(logger, "Handling Termination command")); - if let Some(ingestion_shutdown) = ingestion_shutdown { - ingestion_shutdown.notify(); - } - if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::Info("Terminating event observer".into())); - let _ = tx.send(ObserverEvent::Terminate); - } break; } ObserverCommand::ProcessBitcoinBlock(mut block_data) => { let block_hash = block_data.hash.to_string(); + let mut attempts = 0; + let max_attempts = 10; let block = loop { match standardize_bitcoin_block( block_data.clone(), &config.bitcoin_network, &ctx, ) { - Ok(block) => break block, - Err((e, retry)) => { + Ok(block) => break Some(block), + Err((e, refetch_block)) => { + attempts += 1; + if attempts > max_attempts { + break None; + } ctx.try_log(|logger| { - slog::error!(logger, "Error standardizing block: {}", e) + slog::warn!(logger, "Error standardizing block: {}", e) }); - if retry { + if refetch_block { block_data = match download_and_parse_block_with_retry( &http_client, &block_hash, @@ -814,7 +842,16 @@ pub async fn start_observer_commands_handler( } }; }; - + let Some(block) = block else { + ctx.try_log(|logger| { + slog::crit!( + logger, + "Could not process bitcoin block after {} attempts.", + attempts + ) + }); + break; + }; prometheus_monitoring.btc_metrics_ingest_block(block.block_identifier.index); bitcoin_block_store.insert( @@ -1095,10 +1132,16 @@ pub async fn start_observer_commands_handler( )); } for chainhook_to_trigger in chainhooks_to_trigger.into_iter() { + let predicate_uuid = &chainhook_to_trigger.chainhook.uuid; match handle_bitcoin_hook_action(chainhook_to_trigger, &proofs) { Err(e) => { ctx.try_log(|logger| { - slog::error!(logger, "unable to handle action {}", e) + slog::warn!( + logger, + "unable to handle action for predicate {}: {}", + predicate_uuid, + e + ) }); } Ok(BitcoinChainhookOccurrence::Http(request, data)) => { @@ -1106,7 +1149,7 @@ pub async fn start_observer_commands_handler( } Ok(BitcoinChainhookOccurrence::File(_path, _bytes)) => { ctx.try_log(|logger| { - slog::info!(logger, "Writing to disk not supported in server mode") + slog::warn!(logger, "Writing to disk not supported in server mode") }) } Ok(BitcoinChainhookOccurrence::Data(payload)) => { @@ -1125,21 +1168,20 @@ pub async fn start_observer_commands_handler( }); for hook_uuid in hooks_ids_to_deregister.iter() { - if let Some(chainhook) = chainhook_store + if chainhook_store .predicates .deregister_bitcoin_hook(hook_uuid.clone()) + .is_some() { prometheus_monitoring.btc_metrics_deregister_predicate(); - - if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::PredicateDeregistered( - ChainhookSpecification::Bitcoin(chainhook), - )); - } + } + if let Some(ref tx) = observer_events_tx { + let _ = tx.send(ObserverEvent::PredicateDeregistered(hook_uuid.clone())); } } for (request, data) in requests.into_iter() { + // todo: need to handle failure case - we should be setting interrupted status: https://github.com/hirosystems/chainhook/issues/523 if send_request(request, 3, 1, &ctx).await.is_ok() { if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::BitcoinPredicateTriggered(data)); @@ -1264,10 +1306,16 @@ pub async fn start_observer_commands_handler( } let proofs = HashMap::new(); for chainhook_to_trigger in chainhooks_to_trigger.into_iter() { + let predicate_uuid = &chainhook_to_trigger.chainhook.uuid; match handle_stacks_hook_action(chainhook_to_trigger, &proofs, &ctx) { Err(e) => { ctx.try_log(|logger| { - slog::error!(logger, "unable to handle action {}", e) + slog::warn!( + logger, + "unable to handle action for predicate {}: {}", + predicate_uuid, + e + ) }); } Ok(StacksChainhookOccurrence::Http(request)) => { @@ -1275,7 +1323,7 @@ pub async fn start_observer_commands_handler( } Ok(StacksChainhookOccurrence::File(_path, _bytes)) => { ctx.try_log(|logger| { - slog::info!(logger, "Writing to disk not supported in server mode") + slog::warn!(logger, "Writing to disk not supported in server mode") }) } Ok(StacksChainhookOccurrence::Data(payload)) => { @@ -1287,24 +1335,22 @@ pub async fn start_observer_commands_handler( } for hook_uuid in hooks_ids_to_deregister.iter() { - if let Some(chainhook) = chainhook_store + if chainhook_store .predicates .deregister_stacks_hook(hook_uuid.clone()) + .is_some() { prometheus_monitoring.stx_metrics_deregister_predicate(); - - if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::PredicateDeregistered( - ChainhookSpecification::Stacks(chainhook), - )); - } + } + if let Some(ref tx) = observer_events_tx { + let _ = tx.send(ObserverEvent::PredicateDeregistered(hook_uuid.clone())); } } for request in requests.into_iter() { // todo(lgalabru): collect responses for reporting ctx.try_log(|logger| { - slog::info!( + slog::debug!( logger, "Dispatching request from stacks chainhook {:?}", request @@ -1327,7 +1373,7 @@ pub async fn start_observer_commands_handler( } ObserverCommand::NotifyBitcoinTransactionProxied => { ctx.try_log(|logger| { - slog::info!(logger, "Handling NotifyBitcoinTransactionProxied command") + slog::debug!(logger, "Handling NotifyBitcoinTransactionProxied command") }); if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::NotifyBitcoinTransactionProxied); @@ -1343,14 +1389,13 @@ pub async fn start_observer_commands_handler( Ok(spec) => spec, Err(e) => { ctx.try_log(|logger| { - slog::error!( + slog::warn!( logger, "Unable to register new chainhook spec: {}", e.to_string() ) }); - panic!("Unable to register new chainhook spec: {}", e.to_string()); - //continue; + continue; } }; @@ -1363,11 +1408,15 @@ pub async fn start_observer_commands_handler( } }; - ctx.try_log(|logger| slog::info!(logger, "Registering chainhook {}", spec.uuid(),)); + ctx.try_log( + |logger| slog::debug!(logger, "Registering chainhook {}", spec.uuid(),), + ); if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::PredicateRegistered(spec.clone())); } else { - ctx.try_log(|logger| slog::info!(logger, "Enabling Predicate {}", spec.uuid())); + ctx.try_log(|logger| { + slog::debug!(logger, "Enabling Predicate {}", spec.uuid()) + }); chainhook_store.predicates.enable_specification(&mut spec); } } @@ -1382,15 +1431,19 @@ pub async fn start_observer_commands_handler( ctx.try_log(|logger| { slog::info!(logger, "Handling DeregisterStacksPredicate command") }); - let hook = chainhook_store.predicates.deregister_stacks_hook(hook_uuid); - - prometheus_monitoring.stx_metrics_deregister_predicate(); + let hook = chainhook_store + .predicates + .deregister_stacks_hook(hook_uuid.clone()); - if let (Some(tx), Some(hook)) = (&observer_events_tx, hook) { - let _ = tx.send(ObserverEvent::PredicateDeregistered( - ChainhookSpecification::Stacks(hook), - )); - } + if hook.is_some() { + // on startup, only the predicates in the `chainhook_store` are added to the monitoring count, + // so only those that we find in the store should be removed + prometheus_monitoring.stx_metrics_deregister_predicate(); + }; + // event if the predicate wasn't in the `chainhook_store`, propogate this event to delete from redis + if let Some(tx) = &observer_events_tx { + let _ = tx.send(ObserverEvent::PredicateDeregistered(hook_uuid)); + }; } ObserverCommand::DeregisterBitcoinPredicate(hook_uuid) => { ctx.try_log(|logger| { @@ -1398,15 +1451,17 @@ pub async fn start_observer_commands_handler( }); let hook = chainhook_store .predicates - .deregister_bitcoin_hook(hook_uuid); + .deregister_bitcoin_hook(hook_uuid.clone()); - prometheus_monitoring.btc_metrics_deregister_predicate(); - - if let (Some(tx), Some(hook)) = (&observer_events_tx, hook) { - let _ = tx.send(ObserverEvent::PredicateDeregistered( - ChainhookSpecification::Bitcoin(hook), - )); - } + if hook.is_some() { + // on startup, only the predicates in the `chainhook_store` are added to the monitoring count, + // so only those that we find in the store should be removed + prometheus_monitoring.btc_metrics_deregister_predicate(); + }; + // event if the predicate wasn't in the `chainhook_store`, propogate this event to delete from redis + if let Some(tx) = &observer_events_tx { + let _ = tx.send(ObserverEvent::PredicateDeregistered(hook_uuid)); + }; } ObserverCommand::ExpireStacksPredicate(HookExpirationData { hook_uuid, @@ -1430,8 +1485,23 @@ pub async fn start_observer_commands_handler( } } } + terminate(ingestion_shutdown, observer_events_tx, &ctx); Ok(()) } +fn terminate( + ingestion_shutdown: Option, + observer_events_tx: Option>, + ctx: &Context, +) { + ctx.try_log(|logger| slog::info!(logger, "Handling Termination command")); + if let Some(ingestion_shutdown) = ingestion_shutdown { + ingestion_shutdown.notify(); + } + if let Some(ref tx) = observer_events_tx { + let _ = tx.send(ObserverEvent::Info("Terminating event observer".into())); + let _ = tx.send(ObserverEvent::Terminate); + } +} #[cfg(test)] pub mod tests; diff --git a/components/chainhook-sdk/src/observer/tests/mod.rs b/components/chainhook-sdk/src/observer/tests/mod.rs index 7b678d2ce..053360e10 100644 --- a/components/chainhook-sdk/src/observer/tests/mod.rs +++ b/components/chainhook-sdk/src/observer/tests/mod.rs @@ -498,10 +498,7 @@ fn test_stacks_chainhook_register_deregister() { )); assert!(match observer_events_rx.recv() { Ok(ObserverEvent::PredicateDeregistered(deregistered_chainhook)) => { - assert_eq!( - ChainhookSpecification::Stacks(chainhook), - deregistered_chainhook - ); + assert_eq!(chainhook.uuid, deregistered_chainhook); true } _ => false, @@ -692,7 +689,7 @@ fn test_stacks_chainhook_auto_deregister() { // Should signal that a hook was deregistered assert!(match observer_events_rx.recv() { Ok(ObserverEvent::PredicateDeregistered(deregistered_hook)) => { - assert_eq!(deregistered_hook.uuid(), chainhook.uuid); + assert_eq!(deregistered_hook, chainhook.uuid); true } _ => false, @@ -858,10 +855,7 @@ fn test_bitcoin_chainhook_register_deregister() { )); assert!(match observer_events_rx.recv() { Ok(ObserverEvent::PredicateDeregistered(deregistered_chainhook)) => { - assert_eq!( - ChainhookSpecification::Bitcoin(chainhook), - deregistered_chainhook - ); + assert_eq!(chainhook.uuid, deregistered_chainhook); true } _ => false, @@ -1069,7 +1063,7 @@ fn test_bitcoin_chainhook_auto_deregister() { // Should signal that a hook was deregistered assert!(match observer_events_rx.recv() { Ok(ObserverEvent::PredicateDeregistered(deregistered_hook)) => { - assert_eq!(deregistered_hook.uuid(), chainhook.uuid); + assert_eq!(deregistered_hook, chainhook.uuid); true } _ => false, diff --git a/components/chainhook-sdk/src/utils/mod.rs b/components/chainhook-sdk/src/utils/mod.rs index 7f88e541c..541b6f352 100644 --- a/components/chainhook-sdk/src/utils/mod.rs +++ b/components/chainhook-sdk/src/utils/mod.rs @@ -164,7 +164,7 @@ pub async fn send_request( let err_msg = match request_builder.send().await { Ok(res) => { if res.status().is_success() { - ctx.try_log(|logger| slog::info!(logger, "Trigger {} successful", res.url())); + ctx.try_log(|logger| slog::debug!(logger, "Trigger {} successful", res.url())); return Ok(()); } else { retry += 1;