1
- use std:: path:: PathBuf ;
1
+ use std:: {
2
+ collections:: HashMap ,
3
+ path:: PathBuf ,
4
+ sync:: mpsc:: { channel, Sender } ,
5
+ } ;
2
6
3
7
use chainhook_types:: {
4
8
BitcoinBlockData , BlockIdentifier , OrdinalInscriptionRevealData , TransactionIdentifier ,
@@ -11,13 +15,13 @@ use threadpool::ThreadPool;
11
15
use crate :: {
12
16
indexer:: bitcoin:: {
13
17
retrieve_block_hash_with_retry, retrieve_full_block_breakdown_with_retry,
14
- BitcoinBlockFullBreakdown ,
18
+ standardize_bitcoin_block , BitcoinBlockFullBreakdown ,
15
19
} ,
16
20
observer:: BitcoinConfig ,
17
21
utils:: Context ,
18
22
} ;
19
23
20
- use super :: ord:: height:: Height ;
24
+ use super :: { ord:: height:: Height , update_hord_db_and_augment_bitcoin_block } ;
21
25
22
26
fn get_default_hord_db_file_path ( base_dir : & PathBuf ) -> PathBuf {
23
27
let mut destination_path = base_dir. clone ( ) ;
@@ -414,13 +418,98 @@ pub fn remove_entry_from_inscriptions(
414
418
}
415
419
}
416
420
417
- pub async fn build_bitcoin_traversal_local_storage (
421
+ pub async fn update_hord_db (
422
+ bitcoin_config : & BitcoinConfig ,
423
+ hord_db_path : & PathBuf ,
424
+ hord_db_conn : & Connection ,
425
+ start_block : u64 ,
426
+ end_block : u64 ,
427
+ _ctx : & Context ,
428
+ network_thread : usize ,
429
+ ) -> Result < ( ) , String > {
430
+ let ( block_tx, block_rx) = channel :: < BitcoinBlockFullBreakdown > ( ) ;
431
+ let first_inscription_block_height = 767430 ;
432
+ let ctx = _ctx. clone ( ) ;
433
+ let network = bitcoin_config. network . clone ( ) ;
434
+ let hord_db_path = hord_db_path. clone ( ) ;
435
+ let handle = hiro_system_kit:: thread_named ( "Inscriptions indexing" )
436
+ . spawn ( move || {
437
+ let mut cursor = first_inscription_block_height;
438
+ let mut inbox = HashMap :: new ( ) ;
439
+
440
+ while let Ok ( raw_block) = block_rx. recv ( ) {
441
+ // Early return, only considering blocks after 1st inscription
442
+ if raw_block. height < first_inscription_block_height {
443
+ continue ;
444
+ }
445
+ let block_height = raw_block. height ;
446
+ inbox. insert ( raw_block. height , raw_block) ;
447
+
448
+ // In the context of ordinals, we're constrained to process blocks sequentially
449
+ // Blocks are processed by a threadpool and could be coming out of order.
450
+ // Inbox block for later if the current block is not the one we should be
451
+ // processing.
452
+ if block_height != cursor {
453
+ continue ;
454
+ }
455
+
456
+ // Is the action of processing a block allows us
457
+ // to process more blocks present in the inbox?
458
+ while let Some ( next_block) = inbox. remove ( & cursor) {
459
+ let mut new_block = match standardize_bitcoin_block ( next_block, & network, & ctx)
460
+ {
461
+ Ok ( block) => block,
462
+ Err ( e) => {
463
+ ctx. try_log ( |logger| {
464
+ slog:: error!( logger, "Unable to standardize bitcoin block: {e}" , )
465
+ } ) ;
466
+ return ;
467
+ }
468
+ } ;
469
+
470
+ if let Err ( e) = update_hord_db_and_augment_bitcoin_block (
471
+ & mut new_block,
472
+ & hord_db_path,
473
+ & ctx,
474
+ ) {
475
+ ctx. try_log ( |logger| {
476
+ slog:: error!(
477
+ logger,
478
+ "Unable to augment bitcoin block with hord_db: {e}" ,
479
+ )
480
+ } ) ;
481
+ return ;
482
+ }
483
+ cursor += 1 ;
484
+ }
485
+ }
486
+ } )
487
+ . expect ( "unable to detach thread" ) ;
488
+
489
+ fetch_and_cache_blocks_in_hord_db (
490
+ bitcoin_config,
491
+ hord_db_conn,
492
+ start_block,
493
+ end_block,
494
+ & _ctx,
495
+ network_thread,
496
+ Some ( block_tx) ,
497
+ )
498
+ . await ?;
499
+
500
+ let _ = handle. join ( ) ;
501
+
502
+ Ok ( ( ) )
503
+ }
504
+
505
+ pub async fn fetch_and_cache_blocks_in_hord_db (
418
506
bitcoin_config : & BitcoinConfig ,
419
507
hord_db_conn : & Connection ,
420
508
start_block : u64 ,
421
509
end_block : u64 ,
422
510
ctx : & Context ,
423
511
network_thread : usize ,
512
+ block_tx : Option < Sender < BitcoinBlockFullBreakdown > > ,
424
513
) -> Result < ( ) , String > {
425
514
let retrieve_block_hash_pool = ThreadPool :: new ( network_thread) ;
426
515
let ( block_hash_tx, block_hash_rx) = crossbeam_channel:: unbounded ( ) ;
@@ -471,10 +560,14 @@ pub async fn build_bitcoin_traversal_local_storage(
471
560
. spawn ( move || {
472
561
while let Ok ( Some ( block_data) ) = block_data_rx. recv ( ) {
473
562
let block_compressed_tx_moved = block_compressed_tx. clone ( ) ;
563
+ let block_tx = block_tx. clone ( ) ;
474
564
compress_block_data_pool. execute ( move || {
475
565
let compressed_block = CompactedBlock :: from_full_block ( & block_data) ;
476
- let _ = block_compressed_tx_moved
477
- . send ( Some ( ( block_data. height as u32 , compressed_block) ) ) ;
566
+ let block_index = block_data. height as u32 ;
567
+ if let Some ( block_tx) = block_tx {
568
+ let _ = block_tx. send ( block_data) ;
569
+ }
570
+ let _ = block_compressed_tx_moved. send ( Some ( ( block_index, compressed_block) ) ) ;
478
571
} ) ;
479
572
480
573
let res = compress_block_data_pool. join ( ) ;
0 commit comments