From 4d5e03143256e2edf51feaf44d8d0102ec1b03e8 Mon Sep 17 00:00:00 2001 From: Micaiah Reid Date: Thu, 14 Mar 2024 16:42:30 -0400 Subject: [PATCH] fix: seed forking handler with unconfirmed blocks to improve startup stability (#505) ### Description If Chainhook restarts in the middle of a reorg taking place, it doesn't have any context for choosing the canonical fork. Then when block collisions take place, Chainhook fails to process the new blocks, causing gaps in the blocks Chainhook has available for evaluation. This PR seeds the stacks block indexer with unconfirmed blocks on startup, so that Chainhook has the necessary context to handle a reorg. Most of the PR is to add two tests: - I've added some new functionality to our very thorough indexer tests - In addition to providing the blocks to be mined and the order to mine them, we also now allow providing some "unconfirmed" blocks to seed the block pool with. - I've added some test cases that reproduce what caused outages on the Platform's Chainhook node - I've added a new service test that: - Verifies that unconfirmed blocks are stored on restart - Verifies that those blocks are used to seed the block pool, and that a reorg is handled correctly. I committed these tests _before_ adding the fix, so you can confirm the fix by checking out the commits with the new tests (96d8249b239e53a877a98ff493ef7ae3571aca37 and 9aad55e2a88bf5b75c3e49d079c5967b7a8cf0e3), seeing that the tests break, then pulling the last commit to see that the fix works. Fixes #487 --- components/chainhook-cli/src/cli/mod.rs | 2 +- components/chainhook-cli/src/service/mod.rs | 24 +- .../src/service/tests/helpers/mock_service.rs | 263 ++++++- .../service/tests/helpers/mock_stacks_node.rs | 43 +- .../src/service/tests/helpers/mod.rs | 12 +- .../chainhook-cli/src/service/tests/mod.rs | 696 ++++++++---------- .../src/service/tests/observer_tests.rs | 38 +- components/chainhook-cli/src/storage/mod.rs | 49 +- components/chainhook-sdk/src/indexer/mod.rs | 6 +- .../src/indexer/stacks/blocks_pool.rs | 26 + .../chainhook-sdk/src/indexer/stacks/tests.rs | 113 +-- .../indexer/tests/helpers/stacks_shapes.rs | 90 ++- .../chainhook-sdk/src/indexer/tests/mod.rs | 12 +- components/chainhook-sdk/src/observer/mod.rs | 10 +- 14 files changed, 851 insertions(+), 533 deletions(-) diff --git a/components/chainhook-cli/src/cli/mod.rs b/components/chainhook-cli/src/cli/mod.rs index ecb6dc96f..3fa1b1a88 100644 --- a/components/chainhook-cli/src/cli/mod.rs +++ b/components/chainhook-cli/src/cli/mod.rs @@ -310,7 +310,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { info!(ctx.expect_logger(), "Starting service...",); let mut service = Service::new(config, ctx); - return service.run(predicates).await; + return service.run(predicates, None).await; } }, Command::Config(subcmd) => match subcmd { diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index 5c524b568..1d1df8a08 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -6,7 +6,8 @@ use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv; use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server}; use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop}; use crate::storage::{ - confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, open_readwrite_stacks_db_conn, + confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, get_all_unconfirmed_blocks, + open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn, }; use chainhook_sdk::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification}; @@ -20,7 +21,7 @@ use chainhook_sdk::types::{Chain, StacksChainEvent}; use chainhook_sdk::utils::Context; use redis::{Commands, Connection}; -use std::sync::mpsc::channel; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::time::{SystemTime, UNIX_EPOCH}; use self::http_api::get_entry_from_predicates_db; @@ -38,6 +39,7 @@ impl Service { pub async fn run( &mut self, predicates_from_startup: Vec, + observer_commands_tx_rx: Option<(Sender, Receiver)>, ) -> Result<(), String> { let mut chainhook_config = ChainhookConfig::new(); @@ -149,7 +151,8 @@ impl Service { } } - let (observer_command_tx, observer_command_rx) = channel(); + let (observer_command_tx, observer_command_rx) = + observer_commands_tx_rx.unwrap_or(channel()); let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded(); // let (ordinal_indexer_command_tx, ordinal_indexer_command_rx) = channel(); @@ -211,6 +214,20 @@ impl Service { }); } + let ctx = self.ctx.clone(); + let stacks_db = + open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, &ctx)?; + let unconfirmed_blocks = match get_all_unconfirmed_blocks(&stacks_db, &ctx) { + Ok(blocks) => Some(blocks), + Err(e) => { + info!( + self.ctx.expect_logger(), + "Failed to get stacks blocks from db to seed block pool: {}", e + ); + None + } + }; + let observer_event_tx_moved = observer_event_tx.clone(); let moved_observer_command_tx = observer_command_tx.clone(); let _ = start_event_observer( @@ -219,6 +236,7 @@ impl Service { observer_command_rx, Some(observer_event_tx_moved), None, + unconfirmed_blocks, self.ctx.clone(), ); diff --git a/components/chainhook-cli/src/service/tests/helpers/mock_service.rs b/components/chainhook-cli/src/service/tests/helpers/mock_service.rs index 890f2ec63..55ae3e859 100644 --- a/components/chainhook-cli/src/service/tests/helpers/mock_service.rs +++ b/components/chainhook-cli/src/service/tests/helpers/mock_service.rs @@ -1,24 +1,21 @@ -use crate::config::Config; -use crate::config::EventSourceConfig; -use crate::config::LimitsConfig; -use crate::config::MonitoringConfig; -use crate::config::PathConfig; -use crate::config::PredicatesApi; -use crate::config::PredicatesApiConfig; -use crate::config::StorageConfig; -use crate::config::DEFAULT_REDIS_URI; -use crate::service::http_api::start_predicate_api_server; -use crate::service::PredicateStatus; -use crate::service::Service; -use chainhook_sdk::chainhooks::types::ChainhookFullSpecification; -use chainhook_sdk::indexer::IndexerConfig; -use chainhook_sdk::observer::ObserverCommand; -use chainhook_sdk::types::BitcoinBlockSignaling; -use chainhook_sdk::types::BitcoinNetwork; -use chainhook_sdk::types::Chain; -use chainhook_sdk::types::StacksNetwork; -use chainhook_sdk::types::StacksNodeConfig; -use chainhook_sdk::utils::Context; +use crate::config::{ + Config, EventSourceConfig, LimitsConfig, MonitoringConfig, PathConfig, PredicatesApi, + PredicatesApiConfig, StorageConfig, DEFAULT_REDIS_URI, +}; +use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv; +use crate::service::{ + http_api::start_predicate_api_server, update_predicate_spec, update_predicate_status, + PredicateStatus, Service, +}; +use chainhook_sdk::{ + chainhooks::types::{ + ChainhookFullSpecification, ChainhookSpecification, StacksChainhookFullSpecification, + }, + indexer::IndexerConfig, + observer::ObserverCommand, + types::{BitcoinBlockSignaling, BitcoinNetwork, Chain, StacksNetwork, StacksNodeConfig}, + utils::Context, +}; use redis::Commands; use reqwest::Method; use rocket::serde::json::Value as JsonValue; @@ -26,8 +23,15 @@ use rocket::Shutdown; use std::path::PathBuf; use std::process::Stdio; use std::process::{Child, Command}; +use std::sync::mpsc; use std::sync::mpsc::channel; use std::sync::mpsc::Receiver; +use std::sync::mpsc::Sender; + +use super::get_free_port; +use super::mock_bitcoin_rpc::mock_bitcoin_rpc; +use super::mock_stacks_node::create_tmp_working_dir; +use super::mock_stacks_node::write_stacks_blocks_to_tsv; pub async fn get_predicate_status(uuid: &str, port: u16) -> Result { let mut attempts = 0; @@ -328,14 +332,19 @@ pub fn get_chainhook_config( pub async fn start_chainhook_service( config: Config, - chainhook_port: u16, + ping_startup_port: u16, startup_predicates: Option>, ctx: &Context, -) -> Result<(), String> { +) -> Result, String> { let mut service = Service::new(config, ctx.clone()); + let (observer_command_tx, observer_command_rx) = mpsc::channel(); + let moved_observer_command_tx = observer_command_tx.clone(); let _ = hiro_system_kit::thread_named("Chainhook service") .spawn(move || { - let future = service.run(startup_predicates.unwrap_or(vec![])); + let future = service.run( + startup_predicates.unwrap_or(vec![]), + Some((moved_observer_command_tx, observer_command_rx)), + ); let _ = hiro_system_kit::nestable_block_on(future); }) .map_err(|e| { @@ -354,14 +363,216 @@ pub async fn start_chainhook_service( } if let Ok(_client) = reqwest::Client::new() - .get(format!("http://localhost:{}/ping", chainhook_port)) + .get(format!("http://localhost:{}/ping", ping_startup_port)) .send() .await { - break Ok(()); // Server is ready + break Ok(observer_command_tx); // Server is ready } tokio::time::sleep(std::time::Duration::from_secs(1)).await; attempts += 1; } } + +pub struct TestSetupResult { + pub redis_process: Child, + pub working_dir: String, + pub chainhook_service_port: u16, + pub redis_port: u16, + pub stacks_ingestion_port: u16, + pub stacks_rpc_port: u16, + pub bitcoin_rpc_port: u16, + pub prometheus_port: u16, + pub observer_command_tx: Sender, +} + +pub async fn setup_stacks_chainhook_test( + starting_chain_tip: u64, + redis_seed: Option<(StacksChainhookFullSpecification, PredicateStatus)>, + startup_predicates: Option>, +) -> TestSetupResult { + let ( + redis_port, + chainhook_service_port, + stacks_rpc_port, + stacks_ingestion_port, + bitcoin_rpc_port, + prometheus_port, + ) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}")); + + let mut redis_process = start_redis(redis_port) + .await + .unwrap_or_else(|e| panic!("test failed with error: {e}")); + flush_redis(redis_port); + + let logger = hiro_system_kit::log::setup_logger(); + let _guard = hiro_system_kit::log::setup_global_logger(logger.clone()); + let ctx = Context { + logger: Some(logger), + tracer: false, + }; + + if let Some((predicate, status)) = redis_seed { + let client = redis::Client::open(format!("redis://localhost:{redis_port}/")) + .unwrap_or_else(|e| { + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + let mut connection = client.get_connection().unwrap_or_else(|e| { + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + let stacks_spec = predicate + .into_selected_network_specification(&StacksNetwork::Devnet) + .unwrap_or_else(|e| { + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + + let spec = ChainhookSpecification::Stacks(stacks_spec); + update_predicate_spec(&spec.key(), &spec, &mut connection, &ctx); + update_predicate_status(&spec.key(), status, &mut connection, &ctx); + } + + let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| { + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + + write_stacks_blocks_to_tsv(starting_chain_tip, &tsv_dir).unwrap_or_else(|e| { + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + + let mut config = get_chainhook_config( + redis_port, + chainhook_service_port, + stacks_rpc_port, + stacks_ingestion_port, + bitcoin_rpc_port, + &working_dir, + &tsv_dir, + Some(prometheus_port), + ); + + consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx) + .await + .unwrap_or_else(|e| { + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + + let observer_command_tx = + start_chainhook_service(config, chainhook_service_port, startup_predicates, &ctx) + .await + .unwrap_or_else(|e| { + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + TestSetupResult { + redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port, + stacks_rpc_port, + bitcoin_rpc_port, + prometheus_port, + observer_command_tx, + } +} + +pub async fn setup_bitcoin_chainhook_test(starting_chain_tip: u64) -> TestSetupResult { + let ( + redis_port, + chainhook_service_port, + stacks_rpc_port, + stacks_ingestion_port, + bitcoin_rpc_port, + prometheus_port, + ) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}")); + + let mut redis_process = start_redis(redis_port) + .await + .unwrap_or_else(|e| panic!("test failed with error: {e}")); + + flush_redis(redis_port); + let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| { + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + + let logger = hiro_system_kit::log::setup_logger(); + let _guard = hiro_system_kit::log::setup_global_logger(logger.clone()); + let ctx = Context { + logger: Some(logger), + tracer: false, + }; + + let _ = hiro_system_kit::thread_named("Bitcoin rpc service") + .spawn(move || { + let future = mock_bitcoin_rpc(bitcoin_rpc_port, starting_chain_tip); + let _ = hiro_system_kit::nestable_block_on(future); + }) + .expect("unable to spawn thread"); + + let config = get_chainhook_config( + redis_port, + chainhook_service_port, + stacks_rpc_port, + stacks_ingestion_port, + bitcoin_rpc_port, + &working_dir, + &tsv_dir, + Some(prometheus_port), + ); + + let terminator_tx = start_chainhook_service(config, chainhook_service_port, None, &ctx) + .await + .unwrap_or_else(|e| { + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + TestSetupResult { + redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port, + stacks_rpc_port, + bitcoin_rpc_port, + prometheus_port, + observer_command_tx: terminator_tx, + } +} + +pub fn setup_chainhook_service_ports() -> Result<(u16, u16, u16, u16, u16, u16), String> { + let redis_port = get_free_port()?; + let chainhook_service_port = get_free_port()?; + let stacks_rpc_port = get_free_port()?; + let stacks_ingestion_port = get_free_port()?; + let bitcoin_rpc_port = get_free_port()?; + let prometheus_port = get_free_port()?; + Ok(( + redis_port, + chainhook_service_port, + stacks_rpc_port, + stacks_ingestion_port, + bitcoin_rpc_port, + prometheus_port, + )) +} diff --git a/components/chainhook-cli/src/service/tests/helpers/mock_stacks_node.rs b/components/chainhook-cli/src/service/tests/helpers/mock_stacks_node.rs index ef7c32a88..6d03534ef 100644 --- a/components/chainhook-cli/src/service/tests/helpers/mock_stacks_node.rs +++ b/components/chainhook-cli/src/service/tests/helpers/mock_stacks_node.rs @@ -8,7 +8,7 @@ use chainhook_sdk::types::{ STXTransferEventData, SmartContractEventData, StacksTransactionEventPayload, }; -use super::{branch_and_height_to_prefixed_hash, height_to_prefixed_hash}; +use super::{branch_and_height_to_prefixed_hash, make_block_hash}; pub const TEST_WORKING_DIR: &str = "src/service/tests/fixtures/tmp"; @@ -128,12 +128,19 @@ fn create_stacks_new_transaction(index: u64) -> NewTransaction { } } -pub fn create_stacks_new_block(height: u64, burn_block_height: u64) -> NewBlock { - let parent_height = if height == 0 { 0 } else { height - 1 }; - let parent_burn_block_height = if burn_block_height == 0 { - 0 - } else { - burn_block_height - 1 +pub fn create_stacks_new_block( + fork_id: u8, + height: u64, + parent_fork_id: u8, + burn_block_height: u64, +) -> NewBlock { + let parent_height = match height { + 0 => 0, + _ => height - 1, + }; + let parent_burn_block_height = match burn_block_height { + 0 => 0, + _ => burn_block_height - 1, }; let mut events = vec![]; @@ -238,16 +245,16 @@ pub fn create_stacks_new_block(height: u64, burn_block_height: u64) -> NewBlock )); NewBlock { block_height: height, - block_hash: height_to_prefixed_hash(height), - index_block_hash: height_to_prefixed_hash(height), + block_hash: make_block_hash(fork_id, height), + index_block_hash: make_block_hash(fork_id, height), burn_block_height: burn_block_height, - burn_block_hash: height_to_prefixed_hash(burn_block_height), - parent_block_hash: height_to_prefixed_hash(parent_height), - parent_index_block_hash: height_to_prefixed_hash(parent_height), + burn_block_hash: make_block_hash(0, burn_block_height), + parent_block_hash: make_block_hash(parent_fork_id, parent_height), + parent_index_block_hash: make_block_hash(parent_fork_id, parent_height), parent_microblock: "0x0000000000000000000000000000000000000000000000000000000000000000" .into(), parent_microblock_sequence: 0, - parent_burn_block_hash: height_to_prefixed_hash(parent_burn_block_height), + parent_burn_block_hash: make_block_hash(0, parent_burn_block_height), parent_burn_block_height: burn_block_height, parent_burn_block_timestamp: 0, transactions: (0..4).map(|i| create_stacks_new_transaction(i)).collect(), @@ -257,10 +264,12 @@ pub fn create_stacks_new_block(height: u64, burn_block_height: u64) -> NewBlock } fn create_stacks_block_received_record( + fork_id: u8, height: u64, + parent_fork_id: u8, burn_block_height: u64, ) -> Result { - let block = create_stacks_new_block(height, burn_block_height); + let block = create_stacks_new_block(fork_id, height, parent_fork_id, burn_block_height); let serialized_block = serde_json::to_string(&block) .map_err(|e| format!("failed to serialize stacks block: {}", e.to_string()))?; Ok(Record { @@ -281,7 +290,7 @@ pub fn write_stacks_blocks_to_tsv(block_count: u64, dir: &str) -> Result<(), Str .expect("unable to create csv writer"); for i in 1..block_count + 1 { writer - .serialize(create_stacks_block_received_record(i, i + 100)?) + .serialize(create_stacks_block_received_record(0, i, 0, i + 100)?) .map_err(|e| format!("failed to write tsv file: {}", e.to_string()))?; } Ok(()) @@ -289,10 +298,12 @@ pub fn write_stacks_blocks_to_tsv(block_count: u64, dir: &str) -> Result<(), Str pub async fn mine_stacks_block( port: u16, + fork_id: u8, height: u64, + parent_fork_id: u8, burn_block_height: u64, ) -> Result<(), String> { - let block = create_stacks_new_block(height, burn_block_height); + let block = create_stacks_new_block(fork_id, height, parent_fork_id, burn_block_height); let serialized_block = serde_json::to_string(&block).unwrap(); let client = reqwest::Client::new(); let _res = client diff --git a/components/chainhook-cli/src/service/tests/helpers/mod.rs b/components/chainhook-cli/src/service/tests/helpers/mod.rs index 76e186edc..0dd9b9c71 100644 --- a/components/chainhook-cli/src/service/tests/helpers/mod.rs +++ b/components/chainhook-cli/src/service/tests/helpers/mod.rs @@ -5,11 +5,13 @@ pub mod mock_bitcoin_rpc; pub mod mock_service; pub mod mock_stacks_node; -pub fn height_to_prefixed_hash(height: u64) -> String { - format!("0x{}", height_to_hash_str(height)) -} -fn height_to_hash_str(height: u64) -> String { - format!("{:0>64}", height.to_string()) +pub fn make_block_hash(fork_id: u8, block_height: u64) -> String { + #![cfg_attr(rustfmt, rustfmt_skip)] + let mut hash = vec![ + fork_id, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ]; + hash.append(&mut block_height.to_be_bytes().to_vec()); + hex::encode(&hash[..]) } pub fn branch_and_height_to_prefixed_hash(branch: Option, height: u64) -> String { diff --git a/components/chainhook-cli/src/service/tests/mod.rs b/components/chainhook-cli/src/service/tests/mod.rs index 5a9853774..8d44c4f88 100644 --- a/components/chainhook-cli/src/service/tests/mod.rs +++ b/components/chainhook-cli/src/service/tests/mod.rs @@ -1,7 +1,5 @@ -use chainhook_sdk::chainhooks::types::{ - ChainhookFullSpecification, ChainhookSpecification, StacksChainhookFullSpecification, -}; -use chainhook_sdk::types::{Chain, StacksNetwork}; +use chainhook_sdk::chainhooks::types::ChainhookFullSpecification; +use chainhook_sdk::types::Chain; use chainhook_sdk::utils::Context; use rocket::serde::json::Value as JsonValue; use rocket::Shutdown; @@ -16,26 +14,23 @@ use test_case::test_case; use chainhook_sdk::observer::ObserverCommand; use self::helpers::build_predicates::{build_bitcoin_payload, build_stacks_payload, DEFAULT_UUID}; -use self::helpers::mock_bitcoin_rpc::mock_bitcoin_rpc; use self::helpers::mock_service::{ call_deregister_predicate, filter_predicate_status_from_all_predicates, flush_redis, - start_chainhook_service, start_redis, -}; -use self::helpers::mock_stacks_node::{ - create_tmp_working_dir, mine_burn_block, mine_stacks_block, write_stacks_blocks_to_tsv, + start_chainhook_service, }; -use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv; +use self::helpers::mock_stacks_node::{mine_burn_block, mine_stacks_block}; +use crate::config::PredicatesApi; use crate::service::tests::helpers::build_predicates::get_random_uuid; -use crate::service::tests::helpers::get_free_port; use crate::service::tests::helpers::mock_service::{ - build_predicate_api_server, call_get_predicate, call_register_predicate, get_chainhook_config, - get_predicate_status, + build_predicate_api_server, call_get_predicate, call_ping, call_register_predicate, + get_chainhook_config, get_predicate_status, setup_bitcoin_chainhook_test, + setup_stacks_chainhook_test, TestSetupResult, }; use crate::service::tests::helpers::mock_stacks_node::create_burn_fork_at; use crate::service::{PredicateStatus, PredicateStatus::*, ScanningData, StreamingData}; +use crate::storage::{get_all_unconfirmed_blocks, open_readonly_stacks_db_conn}; use super::http_api::document_predicate_api_server; -use super::{update_predicate_spec, update_predicate_status}; pub mod helpers; mod observer_tests; @@ -346,23 +341,6 @@ fn _assert_interrupted_status((status, _, _): (PredicateStatus, Option, Opt } } -fn setup_chainhook_service_ports() -> Result<(u16, u16, u16, u16, u16, u16), String> { - let redis_port = get_free_port()?; - let chainhook_service_port = get_free_port()?; - let stacks_rpc_port = get_free_port()?; - let stacks_ingestion_port = get_free_port()?; - let bitcoin_rpc_port = get_free_port()?; - let prometheus_port = get_free_port()?; - Ok(( - redis_port, - chainhook_service_port, - stacks_rpc_port, - stacks_ingestion_port, - bitcoin_rpc_port, - prometheus_port, - )) -} - async fn await_new_scanning_status_complete( uuid: &str, chainhook_service_port: u16, @@ -381,110 +359,6 @@ async fn await_new_scanning_status_complete( } } } - -async fn setup_stacks_chainhook_test( - starting_chain_tip: u64, - redis_seed: Option<(StacksChainhookFullSpecification, PredicateStatus)>, - startup_predicates: Option>, -) -> (Child, String, u16, u16, u16, u16, u16) { - let ( - redis_port, - chainhook_service_port, - stacks_rpc_port, - stacks_ingestion_port, - bitcoin_rpc_port, - prometheus_port, - ) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}")); - - let mut redis_process = start_redis(redis_port) - .await - .unwrap_or_else(|e| panic!("test failed with error: {e}")); - flush_redis(redis_port); - - let logger = hiro_system_kit::log::setup_logger(); - let _guard = hiro_system_kit::log::setup_global_logger(logger.clone()); - let ctx = Context { - logger: Some(logger), - tracer: false, - }; - - if let Some((predicate, status)) = redis_seed { - let client = redis::Client::open(format!("redis://localhost:{redis_port}/")) - .unwrap_or_else(|e| { - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - let mut connection = client.get_connection().unwrap_or_else(|e| { - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - let stacks_spec = predicate - .into_selected_network_specification(&StacksNetwork::Devnet) - .unwrap_or_else(|e| { - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - - let spec = ChainhookSpecification::Stacks(stacks_spec); - update_predicate_spec(&spec.key(), &spec, &mut connection, &ctx); - update_predicate_status(&spec.key(), status, &mut connection, &ctx); - } - - let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| { - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - - write_stacks_blocks_to_tsv(starting_chain_tip, &tsv_dir).unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - - let mut config = get_chainhook_config( - redis_port, - chainhook_service_port, - stacks_rpc_port, - stacks_ingestion_port, - bitcoin_rpc_port, - &working_dir, - &tsv_dir, - Some(prometheus_port), - ); - - consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx) - .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - - start_chainhook_service(config, chainhook_service_port, startup_predicates, &ctx) - .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - ( - redis_process, - working_dir, - chainhook_service_port, - redis_port, - stacks_ingestion_port, - bitcoin_rpc_port, - prometheus_port, - ) -} - #[test_case(5, 0, Some(1), Some(3), Some(3), Some(3) => using assert_confirmed_expiration_status; "predicate_end_block lower than starting_chain_tip ends with ConfirmedExpiration status")] #[test_case(5, 0, Some(1), None, Some(5), Some(5) => using assert_streaming_status; "no predicate_end_block ends with Streaming status")] #[test_case(3, 0, Some(1), Some(5), Some(3), Some(3) => using assert_streaming_status; "predicate_end_block greater than chain_tip ends with Streaming status")] @@ -502,15 +376,17 @@ async fn test_stacks_predicate_status_is_updated( expected_evaluations: Option, expected_occurrences: Option, ) -> (PredicateStatus, Option, Option) { - let ( + let TestSetupResult { mut redis_process, working_dir, chainhook_service_port, redis_port, stacks_ingestion_port, - _, - _, - ) = setup_stacks_chainhook_test(starting_chain_tip, None, None).await; + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = setup_stacks_chainhook_test(starting_chain_tip, None, None).await; let uuid = &get_random_uuid(); let predicate = build_stacks_payload( @@ -522,130 +398,43 @@ async fn test_stacks_predicate_status_is_updated( ); let _ = call_register_predicate(&predicate, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); await_new_scanning_status_complete(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); for i in 1..blocks_to_mine + 1 { mine_stacks_block( stacks_ingestion_port, + 0, i + starting_chain_tip, + 0, i + starting_chain_tip + 100, ) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); } sleep(Duration::new(2, 0)); let result = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); let found_predicate_status = filter_predicate_status_from_all_predicates(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - assert_eq!(found_predicate_status, result); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); + assert_eq!(found_predicate_status, result); (result, expected_evaluations, expected_occurrences) } -async fn setup_bitcoin_chainhook_test( - starting_chain_tip: u64, -) -> (Child, String, u16, u16, u16, u16, u16) { - let ( - redis_port, - chainhook_service_port, - stacks_rpc_port, - stacks_ingestion_port, - bitcoin_rpc_port, - prometheus_port, - ) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}")); - - let mut redis_process = start_redis(redis_port) - .await - .unwrap_or_else(|e| panic!("test failed with error: {e}")); - - flush_redis(redis_port); - let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| { - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - - let logger = hiro_system_kit::log::setup_logger(); - let _guard = hiro_system_kit::log::setup_global_logger(logger.clone()); - let ctx = Context { - logger: Some(logger), - tracer: false, - }; - - let _ = hiro_system_kit::thread_named("Bitcoin rpc service") - .spawn(move || { - let future = mock_bitcoin_rpc(bitcoin_rpc_port, starting_chain_tip); - let _ = hiro_system_kit::nestable_block_on(future); - }) - .expect("unable to spawn thread"); - - let config = get_chainhook_config( - redis_port, - chainhook_service_port, - stacks_rpc_port, - stacks_ingestion_port, - bitcoin_rpc_port, - &working_dir, - &tsv_dir, - Some(prometheus_port), - ); - - start_chainhook_service(config, chainhook_service_port, None, &ctx) - .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - ( - redis_process, - working_dir, - chainhook_service_port, - redis_port, - stacks_ingestion_port, - bitcoin_rpc_port, - prometheus_port, - ) -} - #[test_case(5, 1, Some(1), Some(3), Some(3), Some(3) => using assert_unconfirmed_expiration_status; "predicate_end_block lower than starting_chain_tip with predicate_end_block confirmations < CONFIRMED_SEGMENT_MINIMUM_LENGTH ends with UnconfirmedExpiration status")] #[test_case(10, 1, Some(1), Some(3), Some(3), Some(3) => using assert_confirmed_expiration_status; "predicate_end_block lower than starting_chain_tip with predicate_end_block confirmations >= CONFIRMED_SEGMENT_MINIMUM_LENGTH ends with ConfirmedExpiration status")] #[test_case(1, 3, Some(1), Some(3), Some(4), Some(3) => using assert_unconfirmed_expiration_status; "predicate_end_block greater than starting_chain_tip and mining blocks so that predicate_end_block confirmations < CONFIRMED_SEGMENT_MINIMUM_LENGTH ends with UnconfirmedExpiration status")] @@ -661,15 +450,17 @@ async fn test_bitcoin_predicate_status_is_updated( expected_evaluations: Option, expected_occurrences: Option, ) -> (PredicateStatus, Option, Option) { - let ( + let TestSetupResult { mut redis_process, working_dir, chainhook_service_port, redis_port, stacks_ingestion_port, + stacks_rpc_port: _, bitcoin_rpc_port, - _, - ) = setup_bitcoin_chainhook_test(starting_chain_tip).await; + prometheus_port: _, + observer_command_tx: _, + } = setup_bitcoin_chainhook_test(starting_chain_tip).await; let uuid = &get_random_uuid(); let predicate = build_bitcoin_payload( @@ -684,21 +475,13 @@ async fn test_bitcoin_predicate_status_is_updated( let _ = call_register_predicate(&predicate, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); await_new_scanning_status_complete(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); for i in 1..blocks_to_mine + 1 { mine_burn_block( @@ -708,36 +491,22 @@ async fn test_bitcoin_predicate_status_is_updated( i + starting_chain_tip, ) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); } sleep(Duration::new(2, 0)); let result = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); let found_predicate_status = filter_predicate_status_from_all_predicates(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - assert_eq!(found_predicate_status, result); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); + assert_eq!(found_predicate_status, result); (result, expected_evaluations, expected_occurrences) } @@ -758,17 +527,19 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( fork_blocks_to_mine: u64, predicate_start_block: Option, predicate_end_block: Option, -) { +) -> Result<(), String> { let starting_chain_tip = 0; - let ( + let TestSetupResult { mut redis_process, working_dir, chainhook_service_port, redis_port, stacks_ingestion_port, + stacks_rpc_port: _, bitcoin_rpc_port, - _, - ) = setup_bitcoin_chainhook_test(starting_chain_tip).await; + prometheus_port: _, + observer_command_tx: _, + } = setup_bitcoin_chainhook_test(starting_chain_tip).await; let uuid = &get_random_uuid(); let predicate = build_bitcoin_payload( @@ -783,12 +554,7 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( let _ = call_register_predicate(&predicate, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; let genesis_branch_key = '0'; let first_block_mined_height = starting_chain_tip + 1; @@ -801,23 +567,13 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( block_height, ) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; } sleep(Duration::new(2, 0)); let status = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; assert_streaming_status((status, None, None)); let branch_key = '1'; @@ -831,12 +587,7 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( fork_point, ) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; let reorg_point = last_block_mined_height + 1; let first_fork_block_mined_height = first_fork_block_mined_height + 1; @@ -850,22 +601,12 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( block_height, ) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; if block_height == reorg_point { sleep(Duration::new(2, 0)); let status = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; assert_streaming_status((status, None, None)); } } @@ -873,27 +614,29 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( sleep(Duration::new(2, 0)); let status = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; + cleanup(&working_dir, redis_port, &mut redis_process); assert_confirmed_expiration_status((status, None, None)); - - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + Ok(()) } #[test_case(Chain::Stacks; "for stacks chain")] #[test_case(Chain::Bitcoin; "for bitcoin chain")] #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] -async fn test_deregister_predicate(chain: Chain) { - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, _) = match &chain - { +async fn test_deregister_predicate(chain: Chain) -> Result<(), String> { + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port: _, + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = match &chain { Chain::Stacks => setup_stacks_chainhook_test(3, None, None).await, Chain::Bitcoin => setup_bitcoin_chainhook_test(3).await, }; @@ -919,49 +662,27 @@ async fn test_deregister_predicate(chain: Chain) { let _ = call_register_predicate(&predicate, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; let result = call_get_predicate(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; assert_eq!(result.get("status"), Some(&json!(200))); let result = call_deregister_predicate(&chain, uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; assert_eq!(result.get("status"), Some(&json!(200))); let mut attempts = 0; loop { let result = call_get_predicate(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; if result.get("status") == Some(&json!(404)) { break; } else if attempts == 3 { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); panic!("predicate was not successfully derigistered"); } else { attempts += 1; @@ -969,9 +690,8 @@ async fn test_deregister_predicate(chain: Chain) { } } - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); + Ok(()) } #[test_case(New, 6 => using assert_confirmed_expiration_status; "preloaded predicate with new status should get scanned until completion")] @@ -1013,38 +733,37 @@ async fn test_restarting_with_saved_predicates( let predicate = serde_json::from_value(predicate).expect("failed to set up stacks chanhook spec for test"); - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, _) = - setup_stacks_chainhook_test(starting_chain_tip, Some((predicate, starting_status)), None) - .await; + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port: _, + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = setup_stacks_chainhook_test(starting_chain_tip, Some((predicate, starting_status)), None) + .await; await_new_scanning_status_complete(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); sleep(Duration::new(2, 0)); let result = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); (result, None, None) } #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] -async fn it_allows_specifying_startup_predicate() { +async fn it_allows_specifying_startup_predicate() -> Result<(), String> { let uuid = &get_random_uuid(); let predicate = build_stacks_payload( Some("devnet"), @@ -1056,37 +775,35 @@ async fn it_allows_specifying_startup_predicate() { let predicate = serde_json::from_value(predicate).expect("failed to set up stacks chanhook spec for test"); let startup_predicate = ChainhookFullSpecification::Stacks(predicate); - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, _) = - setup_stacks_chainhook_test(3, None, Some(vec![startup_predicate])).await; + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port: _, + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = setup_stacks_chainhook_test(3, None, Some(vec![startup_predicate])).await; await_new_scanning_status_complete(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; sleep(Duration::new(2, 0)); let result = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); assert_confirmed_expiration_status((result, None, None)); + Ok(()) } #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] -async fn register_predicate_responds_409_if_uuid_in_use() { +async fn register_predicate_responds_409_if_uuid_in_use() -> Result<(), String> { let uuid = &get_random_uuid(); let predicate = build_stacks_payload( Some("devnet"), @@ -1099,22 +816,25 @@ async fn register_predicate_responds_409_if_uuid_in_use() { .expect("failed to set up stacks chanhook spec for test"); let startup_predicate = ChainhookFullSpecification::Stacks(stacks_spec); - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, _) = - setup_stacks_chainhook_test(3, None, Some(vec![startup_predicate])).await; + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port: _, + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = setup_stacks_chainhook_test(3, None, Some(vec![startup_predicate])).await; let result = call_register_predicate(&predicate, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - assert_eq!(result.get("status"), Some(&json!(409))); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); + assert_eq!(result.get("status"), Some(&json!(409))); + Ok(()) } #[test] @@ -1130,3 +850,161 @@ fn it_generates_open_api_spec() { "breaking change detected: open api spec has been updated" ) } + +#[tokio::test] +#[cfg_attr(not(feature = "redis_tests"), ignore)] +async fn it_seeds_block_pool_on_startup() -> Result<(), String> { + let starting_chain_tip = 3; + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port, + stacks_rpc_port, + bitcoin_rpc_port, + prometheus_port: _, + observer_command_tx, + } = setup_stacks_chainhook_test(starting_chain_tip, None, None).await; + + let blocks_to_mine = 4; + for i in 1..blocks_to_mine + 1 { + mine_stacks_block( + stacks_ingestion_port, + 0, + i + starting_chain_tip, + 0, + i + starting_chain_tip + 100, + ) + .await + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; + } + // we need these blocks to propagate through new stacks block events and save to the db, so give it some time + sleep(Duration::new(1, 0)); + + let logger = hiro_system_kit::log::setup_logger(); + let _guard = hiro_system_kit::log::setup_global_logger(logger.clone()); + let ctx = Context { + logger: Some(logger), + tracer: false, + }; + let db_path = { + let mut destination_path = PathBuf::new(); + destination_path.push(&working_dir); + destination_path + }; + let stacks_db = open_readonly_stacks_db_conn(&db_path, &ctx).expect("unable to read stacks_db"); + // validate that all blocks we just mined are saved as unconfirmed blocks in the database + let unconfirmed_blocks = get_all_unconfirmed_blocks(&stacks_db, &ctx) + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; + let mut unconfirmed_height = starting_chain_tip + 1; + assert_eq!( + blocks_to_mine, + unconfirmed_blocks.len() as u64, + "Number of blocks left unconfirmed in db is not what expected. Expected: {}, Actual: {}", + blocks_to_mine, + unconfirmed_blocks.len() + ); + for block in unconfirmed_blocks.iter() { + assert_eq!( + unconfirmed_height, block.block_identifier.index, + "Unexpected unconfirmed block height. Expected: {}, Actual: {}", + unconfirmed_height, block.block_identifier.index + ); + unconfirmed_height += 1; + } + // terminate chainhook service + let _ = observer_command_tx.send(ObserverCommand::Terminate); + sleep(Duration::new(1, 0)); + let tsv_dir = format!("./{working_dir}/stacks_blocks.tsv"); + let mut config = get_chainhook_config( + redis_port, + chainhook_service_port, + stacks_rpc_port, + stacks_ingestion_port, + bitcoin_rpc_port, + &working_dir, + &tsv_dir, + None, + ); + // the API is still running, so don't restart it + config.http_api = PredicatesApi::Off; + let _ = start_chainhook_service(config, stacks_ingestion_port, None, &ctx).await; + // validate that all of the unconfirmed blocks we just saved are still available after a restart + let unconfirmed_blocks = get_all_unconfirmed_blocks(&stacks_db, &ctx).unwrap(); + let mut unconfirmed_height = starting_chain_tip + 1; + assert_eq!( + blocks_to_mine, + unconfirmed_blocks.len() as u64, + "Number of blocks left unconfirmed in db is not what expected. Expected: {}, Actual: {}", + blocks_to_mine, + unconfirmed_blocks.len() + ); + for block in unconfirmed_blocks.iter() { + assert_eq!( + unconfirmed_height, block.block_identifier.index, + "Unexpected unconfirmed block height. Expected: {}, Actual: {}", + unconfirmed_height, block.block_identifier.index + ); + unconfirmed_height += 1; + } + // mine a block on that same fork + let next_block_height = blocks_to_mine + starting_chain_tip + 1; + mine_stacks_block( + stacks_ingestion_port, + 0, + next_block_height, + 0, + next_block_height + 100, + ) + .await + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; + + // mine the same block number we just mined, but on a different fork + mine_stacks_block( + stacks_ingestion_port, + 1, + next_block_height, + 0, + next_block_height + 100, + ) + .await + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; + + sleep(Duration::new(1, 0)); + // confirm that there was a reorg + let metrics = call_ping(stacks_ingestion_port) + .await + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; + let stacks_last_reorg_data = metrics.get("stacks").unwrap().get("last_reorg").unwrap(); + let applied_blocks = stacks_last_reorg_data + .get("applied_blocks") + .unwrap() + .as_u64() + .unwrap(); + let rolled_back_blocks = stacks_last_reorg_data + .get("rolled_back_blocks") + .unwrap() + .as_u64() + .unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); + assert_eq!(applied_blocks, 1); + assert_eq!(rolled_back_blocks, 1); + Ok(()) +} + +fn cleanup_err( + error: String, + working_dir: &str, + redis_port: u16, + redis_process: &mut Child, +) -> String { + cleanup(working_dir, redis_port, redis_process); + format!("test failed with error: {error}") +} + +fn cleanup(working_dir: &str, redis_port: u16, redis_process: &mut Child) { + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); +} diff --git a/components/chainhook-cli/src/service/tests/observer_tests.rs b/components/chainhook-cli/src/service/tests/observer_tests.rs index f094febd1..ae00f3cc2 100644 --- a/components/chainhook-cli/src/service/tests/observer_tests.rs +++ b/components/chainhook-cli/src/service/tests/observer_tests.rs @@ -14,6 +14,7 @@ use crate::service::tests::{ build_predicates::build_stacks_payload, mock_service::{ call_observer_svc, call_ping, call_prometheus, call_register_predicate, flush_redis, + TestSetupResult, }, }, setup_bitcoin_chainhook_test, setup_stacks_chainhook_test, @@ -26,15 +27,17 @@ use super::helpers::{ #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] async fn ping_endpoint_returns_metrics() { - let ( + let TestSetupResult { mut redis_process, working_dir, chainhook_service_port, redis_port, stacks_ingestion_port, - _, - _, - ) = setup_stacks_chainhook_test(1, None, None).await; + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = setup_stacks_chainhook_test(1, None, None).await; let uuid = &get_random_uuid(); let predicate = build_stacks_payload(Some("devnet"), None, None, None, Some(uuid)); @@ -68,8 +71,17 @@ async fn ping_endpoint_returns_metrics() { #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] async fn prometheus_endpoint_returns_encoded_metrics() { - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, prometheus_port) = - setup_stacks_chainhook_test(1, None, None).await; + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port: _, + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port, + observer_command_tx: _, + } = setup_stacks_chainhook_test(1, None, None).await; let uuid = &get_random_uuid(); let predicate = build_stacks_payload(Some("devnet"), None, None, None, Some(uuid)); @@ -128,8 +140,17 @@ async fn await_observer_started(port: u16) { #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] async fn bitcoin_rpc_requests_are_forwarded(endpoint: &str, body: Value) { - let (mut redis_process, working_dir, _, redis_port, stacks_ingestion_port, _, _) = - setup_bitcoin_chainhook_test(1).await; + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port: _, + redis_port, + stacks_ingestion_port, + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = setup_bitcoin_chainhook_test(1).await; await_observer_started(stacks_ingestion_port).await; @@ -158,6 +179,7 @@ async fn start_and_ping_event_observer(config: EventObserverConfig, ingestion_po observer_commands_rx, None, None, + None, ctx, ) .unwrap(); diff --git a/components/chainhook-cli/src/storage/mod.rs b/components/chainhook-cli/src/storage/mod.rs index c403a83cb..dc6a945d3 100644 --- a/components/chainhook-cli/src/storage/mod.rs +++ b/components/chainhook-cli/src/storage/mod.rs @@ -2,7 +2,13 @@ use std::path::PathBuf; use chainhook_sdk::types::{BlockIdentifier, StacksBlockData, StacksBlockUpdate}; use chainhook_sdk::utils::Context; -use rocksdb::{Options, DB}; +use rocksdb::{Direction, IteratorMode, Options, DB}; + +const UNCONFIRMED_KEY_PREFIX: &[u8; 2] = b"~:"; +const CONFIRMED_KEY_PREFIX: &[u8; 2] = b"b:"; +const KEY_SUFFIX: &[u8; 2] = b":d"; +const LAST_UNCONFIRMED_KEY_PREFIX: &[u8; 3] = b"m:~"; +const LAST_CONFIRMED_KEY_PREFIX: &[u8; 3] = b"m:t"; fn get_db_default_options() -> Options { let mut opts = Options::default(); @@ -87,26 +93,26 @@ pub fn open_readwrite_stacks_db_conn(base_dir: &PathBuf, _ctx: &Context) -> Resu fn get_block_key(block_identifier: &BlockIdentifier) -> [u8; 12] { let mut key = [0u8; 12]; - key[..2].copy_from_slice(b"b:"); + key[..2].copy_from_slice(CONFIRMED_KEY_PREFIX); key[2..10].copy_from_slice(&block_identifier.index.to_be_bytes()); - key[10..].copy_from_slice(b":d"); + key[10..].copy_from_slice(KEY_SUFFIX); key } fn get_unconfirmed_block_key(block_identifier: &BlockIdentifier) -> [u8; 12] { let mut key = [0u8; 12]; - key[..2].copy_from_slice(b"~:"); + key[..2].copy_from_slice(UNCONFIRMED_KEY_PREFIX); key[2..10].copy_from_slice(&block_identifier.index.to_be_bytes()); - key[10..].copy_from_slice(b":d"); + key[10..].copy_from_slice(KEY_SUFFIX); key } fn get_last_confirmed_insert_key() -> [u8; 3] { - *b"m:t" + *LAST_CONFIRMED_KEY_PREFIX } fn get_last_unconfirmed_insert_key() -> [u8; 3] { - *b"m:~" + *LAST_UNCONFIRMED_KEY_PREFIX } pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB, _ctx: &Context) { @@ -164,6 +170,35 @@ pub fn get_last_unconfirmed_block_height_inserted(stacks_db: &DB, _ctx: &Context }) } +pub fn get_all_unconfirmed_blocks( + stacks_db: &DB, + _ctx: &Context, +) -> Result, String> { + let unconfirmed_key_prefix = UNCONFIRMED_KEY_PREFIX; + let mut blocks = vec![]; + let iter = stacks_db.iterator(IteratorMode::From( + unconfirmed_key_prefix, + Direction::Forward, + )); + for item in iter { + match item { + Ok((k, v)) => { + if k.starts_with(unconfirmed_key_prefix) { + let spec: StacksBlockData = serde_json::from_slice(&v[..]).map_err(|e| { + format!("unable to deserialize Stacks block {}", e.to_string()) + })?; + blocks.push(spec); + } else { + // we're past the set of keys we're looking for, so we've found all unconfirmed + return Ok(blocks); + } + } + Err(e) => return Err(format!("failed to get all unconfirmed blocks: {e}")), + }; + } + Ok(blocks) +} + pub fn get_last_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option { stacks_db .get(get_last_confirmed_insert_key()) diff --git a/components/chainhook-sdk/src/indexer/mod.rs b/components/chainhook-sdk/src/indexer/mod.rs index fbf580b20..9b0c97e92 100644 --- a/components/chainhook-sdk/src/indexer/mod.rs +++ b/components/chainhook-sdk/src/indexer/mod.rs @@ -6,7 +6,7 @@ use crate::utils::{AbstractBlock, Context}; use chainhook_types::{ BitcoinBlockSignaling, BitcoinNetwork, BlockHeader, BlockIdentifier, BlockchainEvent, - StacksChainEvent, StacksNetwork, StacksNodeConfig, + StacksBlockData, StacksChainEvent, StacksNetwork, StacksNodeConfig, }; use hiro_system_kit::slog; use rocket::serde::json::Value as JsonValue; @@ -92,6 +92,10 @@ impl Indexer { } } + pub fn seed_stacks_block_pool(&mut self, blocks: Vec, ctx: &Context) { + self.stacks_blocks_pool.seed_block_pool(blocks, ctx); + } + pub fn handle_bitcoin_header( &mut self, header: BlockHeader, diff --git a/components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs b/components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs index 4e52c7bfb..1d4ee9001 100644 --- a/components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs +++ b/components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs @@ -43,6 +43,32 @@ impl StacksBlockPool { } } + pub fn seed_block_pool(&mut self, blocks: Vec, ctx: &Context) { + ctx.try_log(|logger| { + slog::info!(logger, "Seeding block pool with {} blocks", blocks.len()) + }); + for block in blocks { + let existing_entry = self.block_store.get(&block.block_identifier.clone()); + if existing_entry.is_some() { + ctx.try_log(|logger| { + slog::info!( + logger, + "Seeding block pool: Stacks {} has already been processed; skipping", + block.block_identifier + ) + }); + continue; + } + + match self.process_block(block, ctx) { + Ok(_) => {} + Err(e) => { + ctx.try_log(|logger| slog::info!(logger, "Error seeding block pool: {}", e)); + } + } + } + } + pub fn process_block( &mut self, block: StacksBlockData, diff --git a/components/chainhook-sdk/src/indexer/stacks/tests.rs b/components/chainhook-sdk/src/indexer/stacks/tests.rs index 94f7eb8d7..d4d78f227 100644 --- a/components/chainhook-sdk/src/indexer/stacks/tests.rs +++ b/components/chainhook-sdk/src/indexer/stacks/tests.rs @@ -15,262 +15,271 @@ use test_case::test_case; #[test] fn test_stacks_vector_001() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_001()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_001(), None)); } #[test] fn test_stacks_vector_002() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_002()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_002(), None)); } #[test] fn test_stacks_vector_003() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_003()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_003(), None)); } #[test] fn test_stacks_vector_004() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_004()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_004(), None)); } #[test] fn test_stacks_vector_005() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_005()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_005(), None)); } #[test] fn test_stacks_vector_006() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_006()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_006(), None)); } #[test] fn test_stacks_vector_007() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_007()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_007(), None)); } #[test] fn test_stacks_vector_008() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_008()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_008(), None)); } #[test] fn test_stacks_vector_009() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_009()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_009(), None)); } #[test] fn test_stacks_vector_010() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_010()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_010(), None)); } #[test] fn test_stacks_vector_011() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_011()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_011(), None)); } #[test] fn test_stacks_vector_012() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_012()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_012(), None)); } #[test] fn test_stacks_vector_013() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_013()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_013(), None)); } #[test] fn test_stacks_vector_014() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_014()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_014(), None)); } #[test] fn test_stacks_vector_015() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_015()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_015(), None)); } #[test] fn test_stacks_vector_016() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_016()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_016(), None)); } #[test] fn test_stacks_vector_017() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_017()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_017(), None)); } #[test] fn test_stacks_vector_018() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_018()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_018(), None)); } #[test] fn test_stacks_vector_019() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_019()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_019(), None)); } #[test] fn test_stacks_vector_020() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_020()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_020(), None)); } #[test] fn test_stacks_vector_021() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_021()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_021(), None)); } #[test] fn test_stacks_vector_022() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_022()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_022(), None)); } #[test] fn test_stacks_vector_023() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_023()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_023(), None)); } #[test] fn test_stacks_vector_024() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_024()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_024(), None)); } #[test] fn test_stacks_vector_025() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_025()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_025(), None)); } #[test] fn test_stacks_vector_026() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_026()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_026(), None)); } #[test] fn test_stacks_vector_027() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_027()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_027(), None)); } #[test] fn test_stacks_vector_028() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_028()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_028(), None)); } #[test] fn test_stacks_vector_029() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_029()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_029(), None)); } #[test] fn test_stacks_vector_030() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_030()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_030(), None)); } #[test] fn test_stacks_vector_031() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_031()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_031(), None)); } #[test] fn test_stacks_vector_032() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_032()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_032(), None)); } #[test] fn test_stacks_vector_033() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_033()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_033(), None)); } #[test] fn test_stacks_vector_034() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_034()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_034(), None)); } #[test] fn test_stacks_vector_035() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_035()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_035(), None)); } #[test] fn test_stacks_vector_036() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_036()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_036(), None)); } #[test] fn test_stacks_vector_037() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_037()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_037(), None)); } #[test] fn test_stacks_vector_038() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_038()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_038(), None)); } #[test] fn test_stacks_vector_039() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_039()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_039(), None)); } #[test] fn test_stacks_vector_040() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_040()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_040(), None)); } // #[test] // fn test_stacks_vector_041() { -// process_stacks_blocks_and_check_expectations(helpers::shapes::get_vector_041()); +// process_stacks_blocks_and_check_expectations((helpers::shapes::get_vector_041(), None)); // } #[test] fn test_stacks_vector_042() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_042()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_042(), None)); } #[test] fn test_stacks_vector_043() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_043()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_043(), None)); } #[test] fn test_stacks_vector_044() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_044()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_044(), None)); } #[test] fn test_stacks_vector_045() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_045()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_045(), None)); } #[test] fn test_stacks_vector_046() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_046()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_046(), None)); } #[test] fn test_stacks_vector_047() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_047()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_047(), None)); } #[test] fn test_stacks_vector_048() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_048()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_048(), None)); } #[test] fn test_stacks_vector_049() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_049()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_049(), None)); } #[test] fn test_stacks_vector_050() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_050()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_050(), None)); } #[test] fn test_stacks_vector_051() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_051()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_051(), None)); } #[test] fn test_stacks_vector_052() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_052()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_052(), None)); +} + +#[test] +fn test_stacks_vector_053() { + process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_053()); +} +#[test] +fn test_stacks_vector_054() { + process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_054()); } #[test_case(StacksTransactionEventPayload::STXTransferEvent(STXTransferEventData { diff --git a/components/chainhook-sdk/src/indexer/tests/helpers/stacks_shapes.rs b/components/chainhook-sdk/src/indexer/tests/helpers/stacks_shapes.rs index 316849128..1a7f60073 100644 --- a/components/chainhook-sdk/src/indexer/tests/helpers/stacks_shapes.rs +++ b/components/chainhook-sdk/src/indexer/tests/helpers/stacks_shapes.rs @@ -2,7 +2,7 @@ use crate::utils::Context; use super::{super::StacksChainEventExpectation, BlockEvent}; use super::{microblocks, stacks_blocks}; -use chainhook_types::StacksChainEvent; +use chainhook_types::{StacksBlockData, StacksChainEvent}; use hiro_system_kit::slog; pub fn expect_no_chain_update() -> StacksChainEventExpectation { @@ -3739,3 +3739,91 @@ pub fn get_vector_052() -> Vec<(BlockEvent, StacksChainEventExpectation)> { ), ] } + +/// Vector 053: Generate the following blocks +/// +/// A1(0) - B1(1) - C1(3) +/// \ B2(2) +/// +/// +pub fn get_vector_053() -> ( + Vec<(BlockEvent, StacksChainEventExpectation)>, + Option>, +) { + ( + vec![ + ( + stacks_blocks::B1(None), + expect_chain_updated_with_block(stacks_blocks::B1(None), vec![]), + ), + ( + stacks_blocks::B2(None), + expect_chain_updated_with_block_reorg( + vec![stacks_blocks::B1(None)], + vec![stacks_blocks::B2(None)], + vec![], + ), + ), + ( + stacks_blocks::C1(None), + expect_chain_updated_with_block_reorg( + vec![stacks_blocks::B2(None)], + vec![stacks_blocks::B1(None), stacks_blocks::C1(None)], + vec![], + ), + ), + ], + Some(vec![get_block_from_block_event(stacks_blocks::A1(None))]), + ) +} +/// Vector 054: Generate the following blocks +/// +/// A1(0) - B1(0) - C1(1) - D1(4) +/// \ B2(2) - C2(3) +/// +/// +pub fn get_vector_054() -> ( + Vec<(BlockEvent, StacksChainEventExpectation)>, + Option>, +) { + ( + vec![ + ( + stacks_blocks::C1(None), + expect_chain_updated_with_block(stacks_blocks::C1(None), vec![]), + ), + (stacks_blocks::B2(None), expect_no_chain_update()), + ( + stacks_blocks::C2(None), + expect_chain_updated_with_block_reorg( + vec![stacks_blocks::B1(None), stacks_blocks::C1(None)], + vec![stacks_blocks::B2(None), stacks_blocks::C2(None)], + vec![], + ), + ), + ( + stacks_blocks::D1(None), + expect_chain_updated_with_block_reorg( + vec![stacks_blocks::B2(None), stacks_blocks::C2(None)], + vec![ + stacks_blocks::B1(None), + stacks_blocks::C1(None), + stacks_blocks::D1(None), + ], + vec![], + ), + ), + ], + Some(vec![ + get_block_from_block_event(stacks_blocks::A1(None)), + get_block_from_block_event(stacks_blocks::B1(None)), + ]), + ) +} + +fn get_block_from_block_event(block_event: BlockEvent) -> StacksBlockData { + match block_event { + BlockEvent::Block(block) => block, + _ => unreachable!(), + } +} diff --git a/components/chainhook-sdk/src/indexer/tests/mod.rs b/components/chainhook-sdk/src/indexer/tests/mod.rs index 9781a4c14..4705aafc6 100644 --- a/components/chainhook-sdk/src/indexer/tests/mod.rs +++ b/components/chainhook-sdk/src/indexer/tests/mod.rs @@ -3,14 +3,22 @@ use crate::utils::{AbstractBlock, Context}; use self::helpers::BlockEvent; use super::{fork_scratch_pad::ForkScratchPad, StacksBlockPool}; -use chainhook_types::{BitcoinBlockData, BlockchainEvent, StacksChainEvent}; +use chainhook_types::{BitcoinBlockData, BlockchainEvent, StacksBlockData, StacksChainEvent}; pub type StacksChainEventExpectation = Box) -> ()>; pub fn process_stacks_blocks_and_check_expectations( - steps: Vec<(BlockEvent, StacksChainEventExpectation)>, + (steps, block_pool_seed): ( + Vec<(BlockEvent, StacksChainEventExpectation)>, + Option>, + ), ) { let mut blocks_processor = StacksBlockPool::new(); + + if let Some(block_pool_seed) = block_pool_seed { + blocks_processor.seed_block_pool(block_pool_seed, &Context::empty()); + } + for (block_event, check_chain_event_expectations) in steps.into_iter() { match block_event { BlockEvent::Block(block) => { diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index b27e3b292..cc1108b68 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -27,7 +27,7 @@ use bitcoincore_rpc::{Auth, Client, RpcApi}; use chainhook_types::{ BitcoinBlockData, BitcoinBlockSignaling, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData, BitcoinChainUpdatedWithReorgData, BitcoinNetwork, BlockIdentifier, BlockchainEvent, - StacksChainEvent, StacksNetwork, StacksNodeConfig, TransactionIdentifier, + StacksBlockData, StacksChainEvent, StacksNetwork, StacksNodeConfig, TransactionIdentifier, }; use hiro_system_kit; use hiro_system_kit::slog; @@ -426,6 +426,7 @@ pub fn start_event_observer( observer_commands_rx: Receiver, observer_events_tx: Option>, observer_sidecar: Option, + stacks_block_pool_seed: Option>, ctx: Context, ) -> Result<(), Box> { match config.bitcoin_block_signaling { @@ -460,6 +461,7 @@ pub fn start_event_observer( observer_commands_rx, observer_events_tx, observer_sidecar, + stacks_block_pool_seed, context_cloned, ); let _ = hiro_system_kit::nestable_block_on(future); @@ -542,6 +544,7 @@ pub async fn start_stacks_event_observer( observer_commands_rx: Receiver, observer_events_tx: Option>, observer_sidecar: Option, + stacks_block_pool_seed: Option>, ctx: Context, ) -> Result<(), Box> { let indexer_config = IndexerConfig { @@ -553,7 +556,10 @@ pub async fn start_stacks_event_observer( bitcoin_block_signaling: config.bitcoin_block_signaling.clone(), }; - let indexer = Indexer::new(indexer_config.clone()); + let mut indexer = Indexer::new(indexer_config.clone()); + if let Some(stacks_block_pool_seed) = stacks_block_pool_seed { + indexer.seed_stacks_block_pool(stacks_block_pool_seed, &ctx); + } let log_level = if config.display_logs { if cfg!(feature = "cli") {