From 83af58bfdbbdcb5d310a8bcd0a6079325bac2804 Mon Sep 17 00:00:00 2001 From: Micaiah Reid Date: Tue, 30 Jan 2024 11:33:28 -0500 Subject: [PATCH] fix: reduce memory usage in archive file ingestion (#480) Previously, when parsing and archive tsv to determine the canonical fork, we would always start at block 0, even if there was a rocksdb instance containing stacks chaindata. The entire fork was loaded in memory and inserted. Now, we start at the latest confirmed chaintip for this process, which greatly reduces the memory footprint on subsequent startups of the Chainhook service. Note: an initial run of the Chainhook service can still use significant memory. --- components/chainhook-cli/src/archive/mod.rs | 1 + components/chainhook-cli/src/scan/stacks.rs | 38 ++++++++++++++++---- components/chainhook-cli/src/storage/mod.rs | 39 +++++++++++++++++---- 3 files changed, 65 insertions(+), 13 deletions(-) diff --git a/components/chainhook-cli/src/archive/mod.rs b/components/chainhook-cli/src/archive/mod.rs index 5a2429556..81eee7911 100644 --- a/components/chainhook-cli/src/archive/mod.rs +++ b/components/chainhook-cli/src/archive/mod.rs @@ -175,6 +175,7 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont std::process::exit(1); } } + info!(ctx.expect_logger(), "Successfully downloaded tsv file"); config.add_local_stacks_tsv_source(&tsv_file_path); } true diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index 1341f73c4..2bc6f33f5 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -10,7 +10,7 @@ use crate::{ storage::{ get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted, get_stacks_block_at_block_height, insert_entry_in_stacks_blocks, is_stacks_block_present, - open_readwrite_stacks_db_conn, + open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn, }, }; use chainhook_sdk::types::{BlockIdentifier, Chain}; @@ -62,14 +62,18 @@ pub enum RecordKind { pub async fn get_canonical_fork_from_tsv( config: &mut Config, + start_block: Option, ctx: &Context, ) -> Result, String> { let seed_tsv_path = config.expected_local_stacks_tsv_file().clone(); let (record_tx, record_rx) = std::sync::mpsc::channel(); - let start_block = 0; - + let mut start_block = start_block.unwrap_or(0); + info!( + ctx.expect_logger(), + "Parsing tsv file to determine canoncial fork" + ); let parsing_handle = hiro_system_kit::thread_named("Stacks chainstate CSV parsing") .spawn(move || { let mut reader_builder = csv::ReaderBuilder::default() @@ -95,6 +99,7 @@ pub async fn get_canonical_fork_from_tsv( }) .expect("unable to spawn thread"); + let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?; let canonical_fork = { let mut cursor = BlockIdentifier::default(); let mut dump = HashMap::new(); @@ -114,7 +119,16 @@ pub async fn get_canonical_fork_from_tsv( }; if start_block > block_identifier.index { - continue; + // don't insert blocks that are already in the db, + // but do fill any gaps in our data + if is_stacks_block_present(&block_identifier, 0, &stacks_db) + || block_identifier.index == 0 + { + continue; + } else { + start_block = block_identifier.index; + info!(ctx.expect_logger(), "Found missing block ({start_block}) during tsv parsing; will insert into db",); + } } if block_identifier.index > cursor.index { @@ -140,6 +154,10 @@ pub async fn get_canonical_fork_from_tsv( }; let _ = parsing_handle.join(); + info!( + ctx.expect_logger(), + "Finished parsing tsv file to determine canonical fork" + ); Ok(canonical_fork) } @@ -442,7 +460,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate( let _ = download_stacks_dataset_if_required(config, ctx).await; - let mut canonical_fork = get_canonical_fork_from_tsv(config, ctx).await?; + let mut canonical_fork = get_canonical_fork_from_tsv(config, None, ctx).await?; let mut indexer = Indexer::new(config.network.clone()); @@ -538,13 +556,19 @@ pub async fn consolidate_local_stacks_chainstate_using_csv( let _ = download_stacks_dataset_if_required(config, ctx).await; - let mut canonical_fork = get_canonical_fork_from_tsv(config, ctx).await?; + let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?; + let confirmed_tip = get_last_block_height_inserted(&stacks_db, &ctx); + let mut canonical_fork = get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?; let mut indexer = Indexer::new(config.network.clone()); let mut blocks_inserted = 0; let mut blocks_read = 0; let blocks_to_insert = canonical_fork.len(); let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?; + info!( + ctx.expect_logger(), + "Begining import of {} Stacks blocks into rocks db", blocks_to_insert + ); for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) { blocks_read += 1; @@ -573,7 +597,7 @@ pub async fn consolidate_local_stacks_chainstate_using_csv( if blocks_inserted % 2500 == 0 { info!( ctx.expect_logger(), - "Importing Stacks blocks: {}/{}", blocks_read, blocks_to_insert + "Importing Stacks blocks into rocks db: {}/{}", blocks_read, blocks_to_insert ); let _ = stacks_db_rw.flush(); } diff --git a/components/chainhook-cli/src/storage/mod.rs b/components/chainhook-cli/src/storage/mod.rs index 3c72fdb3f..c403a83cb 100644 --- a/components/chainhook-cli/src/storage/mod.rs +++ b/components/chainhook-cli/src/storage/mod.rs @@ -28,6 +28,30 @@ fn get_default_stacks_db_file_path(base_dir: &PathBuf) -> PathBuf { destination_path } +pub fn open_readonly_stacks_db_conn_with_retry( + base_dir: &PathBuf, + retry: u8, + ctx: &Context, +) -> Result { + let mut attempt = 0; + loop { + match open_readonly_stacks_db_conn(base_dir, ctx) { + Ok(conn) => return Ok(conn), + Err(e) => { + debug!( + ctx.expect_logger(), + "Failed to open stadcks.rocksdb. Trying again in a few seconds." + ); + attempt += 1; + std::thread::sleep(std::time::Duration::from_secs(2)); + if attempt > retry { + return Err(e); + } + } + } + } +} + pub fn open_readonly_stacks_db_conn(base_dir: &PathBuf, ctx: &Context) -> Result { let path = get_default_stacks_db_file_path(&base_dir); let opts = get_db_default_options(); @@ -91,12 +115,15 @@ pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB, stacks_db_rw .put(&key, &block_bytes.to_string().as_bytes()) .expect("unable to insert blocks"); - stacks_db_rw - .put( - get_last_confirmed_insert_key(), - block.block_identifier.index.to_be_bytes(), - ) - .expect("unable to insert metadata"); + let previous_last_inserted = get_last_block_height_inserted(stacks_db_rw, _ctx).unwrap_or(0); + if block.block_identifier.index > previous_last_inserted { + stacks_db_rw + .put( + get_last_confirmed_insert_key(), + block.block_identifier.index.to_be_bytes(), + ) + .expect("unable to insert metadata"); + } } pub fn insert_unconfirmed_entry_in_stacks_blocks(