Skip to content

Commit

Permalink
Merge pull request #482 from hirosystems/develop
Browse files Browse the repository at this point in the history
chore(release): publish v1.2.1
  • Loading branch information
MicaiahReid authored Jan 30, 2024
2 parents fc29be7 + 83af58b commit 2eca8e3
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 13 deletions.
1 change: 1 addition & 0 deletions components/chainhook-cli/src/archive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont
std::process::exit(1);
}
}
info!(ctx.expect_logger(), "Successfully downloaded tsv file");
config.add_local_stacks_tsv_source(&tsv_file_path);
}
true
Expand Down
38 changes: 31 additions & 7 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
storage::{
get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted,
get_stacks_block_at_block_height, insert_entry_in_stacks_blocks, is_stacks_block_present,
open_readwrite_stacks_db_conn,
open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn,
},
};
use chainhook_sdk::types::{BlockIdentifier, Chain};
Expand Down Expand Up @@ -62,14 +62,18 @@ pub enum RecordKind {

pub async fn get_canonical_fork_from_tsv(
config: &mut Config,
start_block: Option<u64>,
ctx: &Context,
) -> Result<VecDeque<(BlockIdentifier, BlockIdentifier, String)>, String> {
let seed_tsv_path = config.expected_local_stacks_tsv_file().clone();

let (record_tx, record_rx) = std::sync::mpsc::channel();

let start_block = 0;

let mut start_block = start_block.unwrap_or(0);
info!(
ctx.expect_logger(),
"Parsing tsv file to determine canoncial fork"
);
let parsing_handle = hiro_system_kit::thread_named("Stacks chainstate CSV parsing")
.spawn(move || {
let mut reader_builder = csv::ReaderBuilder::default()
Expand All @@ -95,6 +99,7 @@ pub async fn get_canonical_fork_from_tsv(
})
.expect("unable to spawn thread");

let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let canonical_fork = {
let mut cursor = BlockIdentifier::default();
let mut dump = HashMap::new();
Expand All @@ -114,7 +119,16 @@ pub async fn get_canonical_fork_from_tsv(
};

if start_block > block_identifier.index {
continue;
// don't insert blocks that are already in the db,
// but do fill any gaps in our data
if is_stacks_block_present(&block_identifier, 0, &stacks_db)
|| block_identifier.index == 0
{
continue;
} else {
start_block = block_identifier.index;
info!(ctx.expect_logger(), "Found missing block ({start_block}) during tsv parsing; will insert into db",);
}
}

if block_identifier.index > cursor.index {
Expand All @@ -140,6 +154,10 @@ pub async fn get_canonical_fork_from_tsv(
};
let _ = parsing_handle.join();

info!(
ctx.expect_logger(),
"Finished parsing tsv file to determine canonical fork"
);
Ok(canonical_fork)
}

Expand Down Expand Up @@ -442,7 +460,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(

let _ = download_stacks_dataset_if_required(config, ctx).await;

let mut canonical_fork = get_canonical_fork_from_tsv(config, ctx).await?;
let mut canonical_fork = get_canonical_fork_from_tsv(config, None, ctx).await?;

let mut indexer = Indexer::new(config.network.clone());

Expand Down Expand Up @@ -538,13 +556,19 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(

let _ = download_stacks_dataset_if_required(config, ctx).await;

let mut canonical_fork = get_canonical_fork_from_tsv(config, ctx).await?;
let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let confirmed_tip = get_last_block_height_inserted(&stacks_db, &ctx);
let mut canonical_fork = get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?;

let mut indexer = Indexer::new(config.network.clone());
let mut blocks_inserted = 0;
let mut blocks_read = 0;
let blocks_to_insert = canonical_fork.len();
let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?;
info!(
ctx.expect_logger(),
"Begining import of {} Stacks blocks into rocks db", blocks_to_insert
);
for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
blocks_read += 1;

Expand Down Expand Up @@ -573,7 +597,7 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
if blocks_inserted % 2500 == 0 {
info!(
ctx.expect_logger(),
"Importing Stacks blocks: {}/{}", blocks_read, blocks_to_insert
"Importing Stacks blocks into rocks db: {}/{}", blocks_read, blocks_to_insert
);
let _ = stacks_db_rw.flush();
}
Expand Down
39 changes: 33 additions & 6 deletions components/chainhook-cli/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@ fn get_default_stacks_db_file_path(base_dir: &PathBuf) -> PathBuf {
destination_path
}

pub fn open_readonly_stacks_db_conn_with_retry(
base_dir: &PathBuf,
retry: u8,
ctx: &Context,
) -> Result<DB, String> {
let mut attempt = 0;
loop {
match open_readonly_stacks_db_conn(base_dir, ctx) {
Ok(conn) => return Ok(conn),
Err(e) => {
debug!(
ctx.expect_logger(),
"Failed to open stadcks.rocksdb. Trying again in a few seconds."
);
attempt += 1;
std::thread::sleep(std::time::Duration::from_secs(2));
if attempt > retry {
return Err(e);
}
}
}
}
}

pub fn open_readonly_stacks_db_conn(base_dir: &PathBuf, ctx: &Context) -> Result<DB, String> {
let path = get_default_stacks_db_file_path(&base_dir);
let opts = get_db_default_options();
Expand Down Expand Up @@ -91,12 +115,15 @@ pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB,
stacks_db_rw
.put(&key, &block_bytes.to_string().as_bytes())
.expect("unable to insert blocks");
stacks_db_rw
.put(
get_last_confirmed_insert_key(),
block.block_identifier.index.to_be_bytes(),
)
.expect("unable to insert metadata");
let previous_last_inserted = get_last_block_height_inserted(stacks_db_rw, _ctx).unwrap_or(0);
if block.block_identifier.index > previous_last_inserted {
stacks_db_rw
.put(
get_last_confirmed_insert_key(),
block.block_identifier.index.to_be_bytes(),
)
.expect("unable to insert metadata");
}
}

pub fn insert_unconfirmed_entry_in_stacks_blocks(
Expand Down

0 comments on commit 2eca8e3

Please sign in to comment.