Skip to content

Commit fba5c89

Browse files
author
Ludo Galabru
committed
feat: multithread traversals
1 parent 65afd77 commit fba5c89

File tree

5 files changed

+106
-59
lines changed

5 files changed

+106
-59
lines changed

components/chainhook-cli/src/cli/mod.rs

+10-9
Original file line numberDiff line numberDiff line change
@@ -486,16 +486,16 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
486486
hash: "".into(),
487487
};
488488

489-
let (block_height, offset, ordinal_number, hops) =
490-
retrieve_satoshi_point_using_local_storage(
491-
&hord_db_conn,
492-
&block_identifier,
493-
&transaction_identifier,
494-
&ctx,
495-
)?;
489+
let traversal = retrieve_satoshi_point_using_local_storage(
490+
&hord_db_conn,
491+
&block_identifier,
492+
&transaction_identifier,
493+
&ctx,
494+
)?;
496495
info!(
497496
ctx.expect_logger(),
498-
"Satoshi #{ordinal_number} was minted in block #{block_height} at offset {offset} and was transferred {hops} times.",
497+
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times.",
498+
traversal.ordinal_number, traversal.ordinal_block_number, traversal.ordinal_offset, traversal.transfers
499499
);
500500
}
501501
FindCommand::Inscription(cmd) => {
@@ -588,8 +588,9 @@ pub async fn perform_hord_db_update(
588588
&rw_hord_db_conn,
589589
start_block,
590590
end_block,
591-
&ctx,
592591
network_threads,
592+
&config.expected_cache_path(),
593+
&ctx,
593594
)
594595
.await?;
595596

components/chainhook-cli/src/scan/bitcoin.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,9 @@ pub async fn scan_bitcoin_chain_with_predicate(
114114
&rw_hord_db_conn,
115115
start_block,
116116
end_block,
117-
&ctx,
118117
8,
118+
&config.expected_cache_path(),
119+
&ctx,
119120
)
120121
.await?;
121122

components/chainhook-event-observer/src/hord/db/mod.rs

+16-8
Original file line numberDiff line numberDiff line change
@@ -557,8 +557,9 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
557557
rw_hord_db_conn: &Connection,
558558
start_block: u64,
559559
end_block: u64,
560-
ctx: &Context,
561560
network_thread: usize,
561+
hord_db_path: &PathBuf,
562+
ctx: &Context,
562563
) -> Result<(), String> {
563564
let number_of_blocks_to_process = end_block - start_block + 1;
564565
let retrieve_block_hash_pool = ThreadPool::new(network_thread);
@@ -673,8 +674,9 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
673674
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
674675
&mut new_block,
675676
&rw_hord_db_conn,
676-
&ctx,
677677
false,
678+
&hord_db_path,
679+
&ctx,
678680
) {
679681
ctx.try_log(|logger| {
680682
slog::error!(logger, "Unable to augment bitcoin block with hord_db: {e}",)
@@ -702,12 +704,19 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
702704
Ok(())
703705
}
704706

707+
pub struct TraversalResult {
708+
pub ordinal_block_number: u64,
709+
pub ordinal_offset: u64,
710+
pub ordinal_number: u64,
711+
pub transfers: u32,
712+
}
713+
705714
pub fn retrieve_satoshi_point_using_local_storage(
706715
hord_db_conn: &Connection,
707716
block_identifier: &BlockIdentifier,
708717
transaction_identifier: &TransactionIdentifier,
709718
ctx: &Context,
710-
) -> Result<(u64, u64, u64, u32), String> {
719+
) -> Result<TraversalResult, String> {
711720
ctx.try_log(|logger| {
712721
slog::info!(
713722
logger,
@@ -781,7 +790,6 @@ pub fn retrieve_satoshi_point_using_local_storage(
781790
if sats_in >= total_out {
782791
ordinal_offset = total_out - (sats_in - txin_value);
783792
ordinal_block_number = block_height;
784-
// println!("{h}: {blockhash} -> {} [in:{} , out: {}] {}/{vout} (input #{in_index}) {compounded_offset}", transaction.txid, transaction.vin.len(), transaction.vout.len(), txid);
785793
tx_cursor = (txin, vout as usize);
786794
break;
787795
}
@@ -848,10 +856,10 @@ pub fn retrieve_satoshi_point_using_local_storage(
848856
let height = Height(ordinal_block_number.into());
849857
let ordinal_number = height.starting_sat().0 + ordinal_offset;
850858

851-
Ok((
852-
ordinal_block_number.into(),
859+
Ok(TraversalResult {
860+
ordinal_block_number: ordinal_block_number.into(),
853861
ordinal_offset,
854862
ordinal_number,
855-
hops,
856-
))
863+
transfers: hops,
864+
})
857865
}

components/chainhook-event-observer/src/hord/mod.rs

+74-39
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,15 @@ pub mod ord;
44

55
use bitcoincore_rpc::bitcoin::hashes::hex::FromHex;
66
use bitcoincore_rpc::bitcoin::{Address, Network, Script};
7-
use chainhook_types::{BitcoinBlockData, OrdinalInscriptionTransferData, OrdinalOperation};
7+
use chainhook_types::{
8+
BitcoinBlockData, OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
9+
};
810
use hiro_system_kit::slog;
911
use rusqlite::Connection;
10-
use std::collections::VecDeque;
12+
use std::collections::{HashMap, VecDeque};
13+
use std::path::PathBuf;
14+
use std::sync::mpsc::channel;
15+
use threadpool::ThreadPool;
1116

1217
use crate::{
1318
hord::{
@@ -22,7 +27,10 @@ use crate::{
2227
utils::Context,
2328
};
2429

25-
use self::db::{remove_entry_from_blocks, remove_entry_from_inscriptions};
30+
use self::db::{
31+
open_readonly_hord_db_conn, remove_entry_from_blocks, remove_entry_from_inscriptions,
32+
TraversalResult,
33+
};
2634

2735
pub fn revert_hord_db_with_augmented_bitcoin_block(
2836
block: &BitcoinBlockData,
@@ -64,8 +72,9 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
6472
pub fn update_hord_db_and_augment_bitcoin_block(
6573
new_block: &mut BitcoinBlockData,
6674
rw_hord_db_conn: &Connection,
67-
ctx: &Context,
6875
write_block: bool,
76+
hord_db_path: &PathBuf,
77+
ctx: &Context,
6978
) -> Result<(), String> {
7079
if write_block {
7180
ctx.try_log(|logger| {
@@ -92,60 +101,86 @@ pub fn update_hord_db_and_augment_bitcoin_block(
92101
.clone();
93102
let first_sat_post_subsidy = Height(new_block.block_identifier.index).starting_sat().0;
94103

104+
let mut transactions_ids = vec![];
105+
for new_tx in new_block.transactions.iter_mut().skip(1) {
106+
// Have a new inscription been revealed, if so, are looking at a re-inscription
107+
for ordinal_event in
108+
new_tx.metadata.ordinal_operations.iter_mut()
109+
{
110+
if let OrdinalOperation::InscriptionRevealed(_) = ordinal_event {
111+
transactions_ids.push(new_tx.transaction_identifier.clone());
112+
}
113+
}
114+
}
115+
let expected_traversals = transactions_ids.len();
116+
let (traversal_tx, traversal_rx) = channel::<(TransactionIdentifier, TraversalResult)>();
117+
let traversal_data_pool = ThreadPool::new(10);
118+
119+
for transaction_id in transactions_ids.into_iter() {
120+
let moved_traversal_tx = traversal_tx.clone();
121+
let moved_ctx = ctx.clone();
122+
let block_identifier = new_block.block_identifier.clone();
123+
let hord_db_path = hord_db_path.clone();
124+
traversal_data_pool.execute(move || {
125+
let hord_db_conn = open_readonly_hord_db_conn(&hord_db_path, &moved_ctx).unwrap();
126+
let traversal = retrieve_satoshi_point_using_local_storage(
127+
&hord_db_conn,
128+
&block_identifier,
129+
&transaction_id,
130+
&moved_ctx,
131+
)
132+
.unwrap();
133+
let _ = moved_traversal_tx.send((transaction_id, traversal));
134+
});
135+
}
136+
137+
let mut traversals = HashMap::new();
138+
let mut traversals_received = 0;
139+
while let Ok((transaction_identifier, traversal_result)) = traversal_rx.recv() {
140+
traversals_received += 1;
141+
traversals.insert(transaction_identifier, traversal_result);
142+
if traversals_received == expected_traversals {
143+
break;
144+
}
145+
}
146+
95147
for new_tx in new_block.transactions.iter_mut().skip(1) {
96148
let mut ordinals_events_indexes_to_discard = VecDeque::new();
97149
// Have a new inscription been revealed, if so, are looking at a re-inscription
98150
for (ordinal_event_index, ordinal_event) in
99151
new_tx.metadata.ordinal_operations.iter_mut().enumerate()
100152
{
101153
if let OrdinalOperation::InscriptionRevealed(inscription) = ordinal_event {
102-
let (
103-
ordinal_block_height,
104-
ordinal_offset,
105-
ordinal_number,
106-
transfers_pre_inscription,
107-
) = {
108-
// Are we looking at a re-inscription?
109-
let res = retrieve_satoshi_point_using_local_storage(
110-
&rw_hord_db_conn,
111-
&new_block.block_identifier,
112-
&new_tx.transaction_identifier,
113-
&ctx,
114-
);
115-
116-
match res {
117-
Ok(res) => res,
118-
Err(e) => {
119-
ctx.try_log(|logger| {
120-
slog::error!(
121-
logger,
122-
"unable to retrieve satoshi point: {}",
123-
e.to_string()
124-
);
125-
});
126-
continue;
127-
}
154+
let traversal = match traversals.get(&new_tx.transaction_identifier) {
155+
Some(traversal) => traversal,
156+
None => {
157+
ctx.try_log(|logger| {
158+
slog::error!(logger, "unable to retrieve satoshi point",);
159+
});
160+
continue;
128161
}
129162
};
130163

131-
if let Some(_entry) =
132-
find_inscription_with_ordinal_number(&ordinal_number, &rw_hord_db_conn, &ctx)
133-
{
164+
if let Some(_entry) = find_inscription_with_ordinal_number(
165+
&traversal.ordinal_number,
166+
&rw_hord_db_conn,
167+
&ctx,
168+
) {
134169
ctx.try_log(|logger| {
135170
slog::warn!(
136171
logger,
137172
"Transaction {} in block {} is overriding an existing inscription {}",
138173
new_tx.transaction_identifier.hash,
139174
new_block.block_identifier.index,
140-
ordinal_number
175+
traversal.ordinal_number
141176
);
142177
});
143178
ordinals_events_indexes_to_discard.push_front(ordinal_event_index);
144179
} else {
145-
inscription.ordinal_offset = ordinal_offset;
146-
inscription.ordinal_block_height = ordinal_block_height;
147-
inscription.ordinal_number = ordinal_number;
148-
inscription.transfers_pre_inscription = transfers_pre_inscription;
180+
inscription.ordinal_offset = traversal.ordinal_offset;
181+
inscription.ordinal_block_height = traversal.ordinal_block_number;
182+
inscription.ordinal_number = traversal.ordinal_number;
183+
inscription.transfers_pre_inscription = traversal.transfers;
149184
inscription.inscription_number =
150185
match find_latest_inscription_number(&rw_hord_db_conn, &ctx) {
151186
Ok(inscription_number) => inscription_number,
@@ -167,7 +202,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
167202
new_tx.transaction_identifier.hash,
168203
new_block.block_identifier.index,
169204
inscription.content_type,
170-
ordinal_number
205+
traversal.ordinal_number
171206
);
172207
});
173208

components/chainhook-event-observer/src/observer/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -553,8 +553,9 @@ pub async fn start_observer_commands_handler(
553553
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
554554
block,
555555
&rw_hord_db_conn,
556-
&ctx,
557556
true,
557+
&config.get_cache_path_buf(),
558+
&ctx,
558559
) {
559560
ctx.try_log(|logger| {
560561
slog::error!(
@@ -663,8 +664,9 @@ pub async fn start_observer_commands_handler(
663664
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
664665
block,
665666
&rw_hord_db_conn,
666-
&ctx,
667667
true,
668+
&config.get_cache_path_buf(),
669+
&ctx,
668670
) {
669671
ctx.try_log(|logger| {
670672
slog::error!(

0 commit comments

Comments
 (0)