Skip to content

Commit 05b6d5c

Browse files
author
Ludo Galabru
committed
feat: revisit threading model
1 parent 31d9980 commit 05b6d5c

File tree

5 files changed

+136
-120
lines changed

5 files changed

+136
-120
lines changed

components/hord-cli/src/config/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ impl Config {
113113
HordConfig {
114114
network_thread_max: self.limits.max_number_of_networking_threads,
115115
ingestion_thread_max: self.limits.max_number_of_processing_threads,
116+
ingestion_thread_queue_size: 4,
116117
cache_size: self.limits.max_caching_memory_size_mb,
117118
db_path: self.expected_cache_path(),
118119
first_inscription_height: match self.network.bitcoin_network {

components/hord-cli/src/db/mod.rs

+84-74
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use std::{
22
collections::{BTreeMap, HashMap, VecDeque},
33
hash::BuildHasherDefault,
44
path::PathBuf,
5-
sync::{mpsc::Sender, Arc}, thread::sleep, time::Duration,
5+
sync::{mpsc::Sender, Arc},
6+
thread::sleep,
7+
time::Duration,
68
};
79

810
use chainhook_sdk::{
@@ -760,12 +762,12 @@ pub fn find_all_inscriptions_in_block(
760762
block_height: &u64,
761763
inscriptions_db_conn: &Connection,
762764
ctx: &Context,
763-
) -> Vec<(TransactionIdentifier, TraversalResult)> {
765+
) -> BTreeMap<(TransactionIdentifier, usize), TraversalResult> {
764766
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
765767
let mut stmt = inscriptions_db_conn
766768
.prepare("SELECT inscription_number, ordinal_number, inscription_id FROM inscriptions where block_height = ? ORDER BY inscription_number ASC")
767769
.unwrap();
768-
let mut results = vec![];
770+
let mut results = BTreeMap::new();
769771
let mut rows = stmt.query(args).unwrap();
770772

771773
let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_conn, ctx);
@@ -789,7 +791,10 @@ pub fn find_all_inscriptions_in_block(
789791
transaction_identifier_inscription: transaction_identifier_inscription.clone(),
790792
transfer_data: transfer_data,
791793
};
792-
results.push((transaction_identifier_inscription, traversal));
794+
results.insert(
795+
(transaction_identifier_inscription, inscription_input_index),
796+
traversal,
797+
);
793798
}
794799
return results;
795800
}
@@ -1009,8 +1014,7 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
10091014
let moved_ctx = moved_ctx.clone();
10101015
let moved_http_client = http_client.clone();
10111016
retrieve_block_data_pool.execute(move || {
1012-
moved_ctx
1013-
.try_log(|logger| debug!(logger, "Fetching block #{block_height}"));
1017+
moved_ctx.try_log(|logger| debug!(logger, "Fetching block #{block_height}"));
10141018
let future = download_block_with_retry(
10151019
&moved_http_client,
10161020
&block_hash,
@@ -2280,8 +2284,7 @@ pub async fn rebuild_rocks_db(
22802284

22812285
let number_of_blocks_to_process = end_block - start_block + 1;
22822286

2283-
let compress_block_data_pool = ThreadPool::new(hord_config.ingestion_thread_max);
2284-
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::unbounded();
2287+
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::bounded(50);
22852288
let http_client = build_http_client();
22862289

22872290
let moved_config = bitcoin_config.clone();
@@ -2292,12 +2295,12 @@ pub async fn rebuild_rocks_db(
22922295

22932296
let mut block_heights = VecDeque::from((start_block..=end_block).collect::<Vec<u64>>());
22942297

2295-
for _ in 0..8 {
2298+
for _ in 0..hord_config.network_thread_max {
22962299
if let Some(block_height) = block_heights.pop_front() {
22972300
let config = moved_config.clone();
22982301
let ctx = moved_ctx.clone();
22992302
let http_client = moved_http_client.clone();
2300-
sleep(Duration::from_millis(500));
2303+
sleep(Duration::from_millis(200));
23012304
set.spawn(try_download_block_bytes_with_retry(
23022305
http_client,
23032306
block_height,
@@ -2312,40 +2315,39 @@ pub async fn rebuild_rocks_db(
23122315

23132316
let mut tx_thread_pool = vec![];
23142317
let mut rx_thread_pool = vec![];
2318+
let mut thread_pool_handles = vec![];
23152319

23162320
for _ in 0..hord_config.ingestion_thread_max {
2317-
let (tx, rx) = bounded::<Option<Vec<u8>>>(8);
2321+
let (tx, rx) = bounded::<Option<Vec<u8>>>(hord_config.ingestion_thread_queue_size);
23182322
tx_thread_pool.push(tx);
23192323
rx_thread_pool.push(rx);
23202324
}
23212325

2322-
let compression_thread = hiro_system_kit::thread_named("Block data compression")
2323-
.spawn(move || {
2324-
for rx in rx_thread_pool.into_iter() {
2325-
let block_compressed_tx_moved = block_compressed_tx.clone();
2326-
let moved_ctx: Context = moved_ctx.clone();
2327-
let moved_bitcoin_network = moved_bitcoin_network.clone();
2328-
compress_block_data_pool.execute(move || {
2329-
while let Ok(Some(block_bytes)) = rx.recv() {
2330-
let raw_block_data =
2331-
parse_downloaded_block(block_bytes).expect("unable to parse block");
2332-
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
2333-
.expect("unable to compress block");
2334-
let block_data = hord::parse_ordinals_and_standardize_block(
2335-
raw_block_data,
2336-
&moved_bitcoin_network,
2337-
&moved_ctx,
2338-
)
2339-
.expect("unable to deserialize block");
2326+
for rx in rx_thread_pool.into_iter() {
2327+
let block_compressed_tx_moved = block_compressed_tx.clone();
2328+
let moved_ctx: Context = moved_ctx.clone();
2329+
let moved_bitcoin_network = moved_bitcoin_network.clone();
2330+
2331+
let handle = hiro_system_kit::thread_named("Block data compression")
2332+
.spawn(move || {
2333+
while let Ok(Some(block_bytes)) = rx.recv() {
2334+
let raw_block_data =
2335+
parse_downloaded_block(block_bytes).expect("unable to parse block");
2336+
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
2337+
.expect("unable to compress block");
2338+
let block_data = hord::parse_ordinals_and_standardize_block(
2339+
raw_block_data,
2340+
&moved_bitcoin_network,
2341+
&moved_ctx,
2342+
)
2343+
.expect("unable to deserialize block");
23402344

2341-
let _ =
2342-
block_compressed_tx_moved.send(Some((block_data, compressed_block)));
2343-
}
2344-
});
2345-
}
2346-
let _ = compress_block_data_pool.join();
2347-
})
2348-
.expect("unable to spawn thread");
2345+
let _ = block_compressed_tx_moved.send(Some((block_data, compressed_block)));
2346+
}
2347+
})
2348+
.expect("unable to spawn thread");
2349+
thread_pool_handles.push(handle);
2350+
}
23492351

23502352
let cloned_ctx = ctx.clone();
23512353

@@ -2354,52 +2356,56 @@ pub async fn rebuild_rocks_db(
23542356
let mut inbox = HashMap::new();
23552357
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
23562358
let mut blocks_processed = 0;
2357-
while let Ok(Some((block, compacted_block))) =
2358-
block_compressed_rx.recv()
2359-
{
2360-
blocks_processed += 1;
2361-
let block_index = block.block_identifier.index;
2362-
2363-
// In the context of ordinals, we're constrained to process blocks sequentially
2364-
// Blocks are processed by a threadpool and could be coming out of order.
2365-
// Inbox block for later if the current block is not the one we should be
2366-
// processing.
2367-
if block_index >= start_sequencing_blocks_at_height {
2368-
inbox.insert(block_index, (block, compacted_block));
2369-
let mut chunk = Vec::new();
2370-
while let Some((block, compacted_block)) = inbox.remove(&inbox_cursor) {
2371-
cloned_ctx.try_log(|logger| {
2372-
info!(
2373-
logger,
2374-
"Dequeuing block #{inbox_cursor} for processing (# blocks inboxed: {})",
2375-
inbox.len()
2376-
)
2377-
});
2378-
chunk.push((block, compacted_block));
2379-
inbox_cursor += 1;
2380-
}
2381-
if chunk.is_empty() {
2382-
// Early return / wait for next block
2383-
cloned_ctx.try_log(|logger| {
2384-
info!(logger, "Inboxing compacted block #{block_index}")
2385-
});
2386-
continue;
2359+
2360+
loop {
2361+
// Dequeue all the blocks available
2362+
let mut new_blocks = vec![];
2363+
while let Ok(Some((block, compacted_block))) = block_compressed_rx.try_recv()
2364+
{
2365+
blocks_processed += 1;
2366+
new_blocks.push((block, compacted_block))
2367+
}
2368+
//
2369+
let mut ooo_processing = vec![];
2370+
for (block, compacted_block) in new_blocks.into_iter() {
2371+
let block_index = block.block_identifier.index;
2372+
if block_index >= start_sequencing_blocks_at_height {
2373+
inbox.insert(block_index, (block, compacted_block));
23872374
} else {
2388-
if let Some(ref tx) = blocks_post_processor {
2389-
let _ = tx.send(chunk);
2390-
}
2375+
ooo_processing.push((block, compacted_block));
2376+
// todo: do something
23912377
}
23922378
}
23932379

2394-
if blocks_processed == number_of_blocks_to_process {
2380+
// In order processing: construct the longest sequence of known blocks
2381+
let mut chunk = Vec::new();
2382+
while let Some((block, compacted_block)) = inbox.remove(&inbox_cursor) {
23952383
cloned_ctx.try_log(|logger| {
23962384
info!(
23972385
logger,
2398-
"Local block storage successfully seeded with #{blocks_processed} blocks"
2386+
"Adding block #{inbox_cursor} to next sequence (# blocks inboxed: {})",
2387+
inbox.len()
23992388
)
24002389
});
2401-
break;
2390+
chunk.push((block, compacted_block));
2391+
inbox_cursor += 1;
2392+
}
2393+
if !chunk.is_empty() {
2394+
if let Some(ref tx) = blocks_post_processor {
2395+
let _ = tx.send(chunk);
2396+
}
2397+
} else {
2398+
if blocks_processed == number_of_blocks_to_process {
2399+
cloned_ctx.try_log(|logger| {
2400+
info!(
2401+
logger,
2402+
"Local block storage successfully seeded with #{blocks_processed} blocks"
2403+
)
2404+
});
2405+
break;
2406+
}
24022407
}
2408+
sleep(Duration::from_secs(3));
24032409
}
24042410
()
24052411
})
@@ -2429,7 +2435,11 @@ pub async fn rebuild_rocks_db(
24292435
for tx in tx_thread_pool.iter() {
24302436
let _ = tx.send(None);
24312437
}
2432-
let _ = compression_thread.join();
2438+
2439+
for handle in thread_pool_handles.into_iter() {
2440+
let _ = handle.join();
2441+
}
2442+
24332443
let _ = storage_thread.join();
24342444
let _ = set.shutdown();
24352445

0 commit comments

Comments
 (0)