Skip to content

Commit 168162e

Browse files
author
Ludo Galabru
committed
feat: batch ingestion, improve cleaning
1 parent 41ecace commit 168162e

File tree

2 files changed

+42
-22
lines changed

2 files changed

+42
-22
lines changed

components/hord-cli/src/db/mod.rs

+41-21
Original file line numberDiff line numberDiff line change
@@ -2318,7 +2318,7 @@ pub async fn rebuild_rocks_db(
23182318
rx_thread_pool.push(rx);
23192319
}
23202320

2321-
let _ = hiro_system_kit::thread_named("Block data compression")
2321+
let compression_thread = hiro_system_kit::thread_named("Block data compression")
23222322
.spawn(move || {
23232323
for rx in rx_thread_pool.into_iter() {
23242324
let block_compressed_tx_moved = block_compressed_tx.clone();
@@ -2328,7 +2328,7 @@ pub async fn rebuild_rocks_db(
23282328
while let Ok(Some(block_bytes)) = rx.recv() {
23292329
let raw_block_data = parse_downloaded_block(block_bytes).unwrap();
23302330
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
2331-
.expect("unable to serialize block");
2331+
.expect("unable to compress block");
23322332
let block_data = hord::parse_ordinals_and_standardize_block(
23332333
raw_block_data,
23342334
&moved_bitcoin_network,
@@ -2347,7 +2347,7 @@ pub async fn rebuild_rocks_db(
23472347

23482348
let cloned_ctx = ctx.clone();
23492349

2350-
let _storage_thread = hiro_system_kit::thread_named("Ordered blocks dispatcher")
2350+
let storage_thread = hiro_system_kit::thread_named("Ordered blocks dispatcher")
23512351
.spawn(move || {
23522352
let mut inbox = HashMap::new();
23532353
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
@@ -2367,7 +2367,7 @@ pub async fn rebuild_rocks_db(
23672367
let mut chunk = Vec::new();
23682368
while let Some((block, compacted_block)) = inbox.remove(&inbox_cursor) {
23692369
cloned_ctx.try_log(|logger| {
2370-
slog::info!(
2370+
info!(
23712371
logger,
23722372
"Dequeuing block #{inbox_cursor} for processing (# blocks inboxed: {})",
23732373
inbox.len()
@@ -2379,7 +2379,7 @@ pub async fn rebuild_rocks_db(
23792379
if chunk.is_empty() {
23802380
// Early return / wait for next block
23812381
cloned_ctx.try_log(|logger| {
2382-
slog::info!(logger, "Inboxing compacted block #{block_index}")
2382+
info!(logger, "Inboxing compacted block #{block_index}")
23832383
});
23842384
continue;
23852385
} else {
@@ -2391,27 +2391,12 @@ pub async fn rebuild_rocks_db(
23912391

23922392
if blocks_processed == number_of_blocks_to_process {
23932393
cloned_ctx.try_log(|logger| {
2394-
slog::info!(
2394+
info!(
23952395
logger,
23962396
"Local block storage successfully seeded with #{blocks_processed} blocks"
23972397
)
23982398
});
23992399
break;
2400-
// match guard.report().build() {
2401-
// Ok(report) => {
2402-
// ctx.try_log(|logger| {
2403-
// slog::info!(logger, "Generating report");
2404-
// });
2405-
2406-
// let file = std::fs::File::create("hord-perf.svg").unwrap();
2407-
// report.flamegraph(file).unwrap();
2408-
// }
2409-
// Err(e) => {
2410-
// ctx.try_log(|logger| {
2411-
// slog::error!(logger, "Reporting failed: {}", e.to_string());
2412-
// });
2413-
// }
2414-
// }
24152400
}
24162401
}
24172402
()
@@ -2437,6 +2422,41 @@ pub async fn rebuild_rocks_db(
24372422
thread_index = (thread_index + 1) % hord_config.ingestion_thread_max;
24382423
}
24392424

2425+
ctx.try_log(|logger| {
2426+
info!(
2427+
logger,
2428+
"Gargbage collecting will start"
2429+
)
2430+
});
2431+
2432+
for tx in tx_thread_pool.iter() {
2433+
let _ = tx.send(None);
2434+
}
2435+
let _ = compression_thread.join();
2436+
let _ = storage_thread.join();
24402437
let _ = set.shutdown();
2438+
2439+
ctx.try_log(|logger| {
2440+
info!(
2441+
logger,
2442+
"Gargbage collecting did finish"
2443+
)
2444+
});
2445+
2446+
// match guard.report().build() {
2447+
// Ok(report) => {
2448+
// ctx.try_log(|logger| {
2449+
// slog::info!(logger, "Generating report");
2450+
// });
2451+
// let file = std::fs::File::create("hord-perf.svg").unwrap();
2452+
// report.flamegraph(file).unwrap();
2453+
// }
2454+
// Err(e) => {
2455+
// ctx.try_log(|logger| {
2456+
// slog::error!(logger, "Reporting failed: {}", e.to_string());
2457+
// });
2458+
// }
2459+
// }
2460+
24412461
Ok(())
24422462
}

components/hord-cli/src/service/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl Service {
143143
rebuild_rocks_db(
144144
&self.config,
145145
start_block,
146-
end_block,
146+
end_block.min(start_block + 256),
147147
hord_config.first_inscription_height,
148148
Some(tx.clone()),
149149
&self.ctx,

0 commit comments

Comments
 (0)