1
1
use std:: { path:: PathBuf , time:: Duration } ;
2
2
3
- use chainhook_types:: { BlockIdentifier , OrdinalInscriptionRevealData , TransactionIdentifier } ;
3
+ use chainhook_types:: {
4
+ BitcoinBlockData , BlockIdentifier , OrdinalInscriptionRevealData , TransactionIdentifier ,
5
+ } ;
4
6
use hiro_system_kit:: slog;
5
7
use rand:: RngCore ;
6
8
use rusqlite:: { Connection , OpenFlags , ToSql } ;
@@ -14,15 +16,26 @@ use crate::{
14
16
utils:: Context ,
15
17
} ;
16
18
17
- pub fn get_default_ordinals_db_file_path ( base_dir : & PathBuf ) -> PathBuf {
19
+ fn get_default_ordinals_db_file_path ( base_dir : & PathBuf ) -> PathBuf {
18
20
let mut destination_path = base_dir. clone ( ) ;
19
- destination_path. push ( "bitcoin_block_traversal-readonly .sqlite" ) ;
21
+ destination_path. push ( "bitcoin_block_traversal.sqlite" ) ;
20
22
destination_path
21
23
}
22
24
23
- pub fn open_readonly_ordinals_db_conn ( base_dir : & PathBuf ) -> Result < Connection , String > {
25
+ pub fn open_readonly_ordinals_db_conn (
26
+ base_dir : & PathBuf ,
27
+ ctx : & Context ,
28
+ ) -> Result < Connection , String > {
24
29
let path = get_default_ordinals_db_file_path ( & base_dir) ;
25
- let conn = open_existing_readonly_db ( & path) ;
30
+ let conn = open_existing_readonly_db ( & path, ctx) ;
31
+ Ok ( conn)
32
+ }
33
+
34
+ pub fn open_readwrite_ordinals_db_conn (
35
+ base_dir : & PathBuf ,
36
+ ctx : & Context ,
37
+ ) -> Result < Connection , String > {
38
+ let conn = create_or_open_readwrite_db ( & base_dir, ctx) ;
26
39
Ok ( conn)
27
40
}
28
41
@@ -65,12 +78,13 @@ pub fn initialize_ordinal_state_storage(path: &PathBuf, ctx: &Context) -> Connec
65
78
conn
66
79
}
67
80
68
- fn create_or_open_readwrite_db ( path : & PathBuf , ctx : & Context ) -> Connection {
69
- let open_flags = match std:: fs:: metadata ( path) {
81
+ fn create_or_open_readwrite_db ( cache_path : & PathBuf , ctx : & Context ) -> Connection {
82
+ let path = get_default_ordinals_db_file_path ( & cache_path) ;
83
+ let open_flags = match std:: fs:: metadata ( & path) {
70
84
Err ( e) => {
71
85
if e. kind ( ) == std:: io:: ErrorKind :: NotFound {
72
86
// need to create
73
- if let Some ( dirp) = PathBuf :: from ( path) . parent ( ) {
87
+ if let Some ( dirp) = PathBuf :: from ( & path) . parent ( ) {
74
88
std:: fs:: create_dir_all ( dirp) . unwrap_or_else ( |e| {
75
89
ctx. try_log ( |logger| slog:: error!( logger, "{}" , e. to_string( ) ) ) ;
76
90
} ) ;
@@ -86,15 +100,23 @@ fn create_or_open_readwrite_db(path: &PathBuf, ctx: &Context) -> Connection {
86
100
}
87
101
} ;
88
102
89
- let conn = Connection :: open_with_flags ( path, open_flags) . unwrap ( ) ;
103
+ let conn = loop {
104
+ match Connection :: open_with_flags ( & path, open_flags) {
105
+ Ok ( conn) => break conn,
106
+ Err ( e) => {
107
+ ctx. try_log ( |logger| slog:: error!( logger, "{}" , e. to_string( ) ) ) ;
108
+ }
109
+ } ;
110
+ std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) ;
111
+ } ;
90
112
// db.profile(Some(trace_profile));
91
113
// db.busy_handler(Some(tx_busy_handler))?;
92
114
conn. pragma_update ( None , "journal_mode" , & "WAL" ) . unwrap ( ) ;
93
115
conn. pragma_update ( None , "synchronous" , & "NORMAL" ) . unwrap ( ) ;
94
116
conn
95
117
}
96
118
97
- fn open_existing_readonly_db ( path : & PathBuf ) -> Connection {
119
+ fn open_existing_readonly_db ( path : & PathBuf , ctx : & Context ) -> Connection {
98
120
let open_flags = match std:: fs:: metadata ( path) {
99
121
Err ( e) => {
100
122
if e. kind ( ) == std:: io:: ErrorKind :: NotFound {
@@ -109,8 +131,16 @@ fn open_existing_readonly_db(path: &PathBuf) -> Connection {
109
131
}
110
132
} ;
111
133
112
- let conn = Connection :: open_with_flags ( path, open_flags) . unwrap ( ) ;
113
- conn
134
+ let conn = loop {
135
+ match Connection :: open_with_flags ( path, open_flags) {
136
+ Ok ( conn) => break conn,
137
+ Err ( e) => {
138
+ ctx. try_log ( |logger| slog:: error!( logger, "{}" , e. to_string( ) ) ) ;
139
+ }
140
+ } ;
141
+ std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) ;
142
+ } ;
143
+ return conn;
114
144
}
115
145
116
146
#[ derive( Debug , Serialize , Deserialize ) ]
@@ -155,6 +185,39 @@ impl CompactedBlock {
155
185
CompactedBlock ( ( ( coinbase_txid, coinbase_value) , txs) )
156
186
}
157
187
188
+ pub fn from_standardized_block ( block : & BitcoinBlockData ) -> CompactedBlock {
189
+ let mut txs = vec ! [ ] ;
190
+ let mut coinbase_value = 0 ;
191
+ let coinbase_txid = {
192
+ let txid =
193
+ hex:: decode ( & block. transactions [ 0 ] . transaction_identifier . hash [ 2 ..] ) . unwrap ( ) ;
194
+ [ txid[ 0 ] , txid[ 1 ] , txid[ 2 ] , txid[ 3 ] ]
195
+ } ;
196
+ for coinbase_output in block. transactions [ 0 ] . metadata . outputs . iter ( ) {
197
+ coinbase_value += coinbase_output. value ;
198
+ }
199
+ for tx in block. transactions . iter ( ) . skip ( 1 ) {
200
+ let mut inputs = vec ! [ ] ;
201
+ for input in tx. metadata . inputs . iter ( ) {
202
+ let txin = hex:: decode ( & input. previous_output . txid [ 2 ..] ) . unwrap ( ) ;
203
+
204
+ inputs. push ( (
205
+ [ txin[ 0 ] , txin[ 1 ] , txin[ 2 ] , txin[ 3 ] ] ,
206
+ input. previous_output . block_height as u32 ,
207
+ input. previous_output . vout as u16 ,
208
+ input. previous_output . value ,
209
+ ) ) ;
210
+ }
211
+ let mut outputs = vec ! [ ] ;
212
+ for output in tx. metadata . outputs . iter ( ) {
213
+ outputs. push ( output. value ) ;
214
+ }
215
+ let txid = hex:: decode ( & tx. transaction_identifier . hash [ 2 ..] ) . unwrap ( ) ;
216
+ txs. push ( ( [ txid[ 0 ] , txid[ 1 ] , txid[ 2 ] , txid[ 3 ] ] , inputs, outputs) ) ;
217
+ }
218
+ CompactedBlock ( ( ( coinbase_txid, coinbase_value) , txs) )
219
+ }
220
+
158
221
pub fn from_hex_bytes ( bytes : & str ) -> CompactedBlock {
159
222
let bytes = hex:: decode ( & bytes) . unwrap ( ) ;
160
223
let value = ciborium:: de:: from_reader ( & bytes[ ..] ) . unwrap ( ) ;
@@ -327,7 +390,7 @@ pub fn write_compacted_block_to_index(
327
390
328
391
pub async fn build_bitcoin_traversal_local_storage (
329
392
bitcoin_config : & BitcoinConfig ,
330
- cache_path : & PathBuf ,
393
+ storage_conn : & Connection ,
331
394
start_block : u64 ,
332
395
end_block : u64 ,
333
396
ctx : & Context ,
@@ -351,62 +414,68 @@ pub async fn build_bitcoin_traversal_local_storage(
351
414
let future = retrieve_block_hash ( & config, & block_height) ;
352
415
match hiro_system_kit:: nestable_block_on ( future) {
353
416
Ok ( block_hash) => {
354
- err_count = 0 ;
355
- block_hash_tx. send ( Some ( ( block_cursor, block_hash) ) ) ;
417
+ let _ = block_hash_tx. send ( Some ( ( block_cursor, block_hash) ) ) ;
356
418
break ;
357
419
}
358
420
Err ( e) => {
359
421
err_count += 1 ;
360
422
let delay = ( err_count + ( rng. next_u64 ( ) % 3 ) ) * 1000 ;
361
- println ! ( "retry hash:fetch in {delay}" ) ;
362
423
std:: thread:: sleep ( std:: time:: Duration :: from_millis ( delay) ) ;
363
424
}
364
425
}
365
426
}
366
427
} ) ;
367
428
}
368
429
369
- let db_file = get_default_ordinals_db_file_path ( & cache_path) ;
370
430
let bitcoin_config = bitcoin_config. clone ( ) ;
371
431
let moved_ctx = ctx. clone ( ) ;
372
- let handle = hiro_system_kit:: thread_named ( "Block data retrieval" ) . spawn ( move || {
432
+ let block_data_tx_moved = block_data_tx. clone ( ) ;
433
+ let handle_1 = hiro_system_kit:: thread_named ( "Block data retrieval" ) . spawn ( move || {
373
434
while let Ok ( Some ( ( block_height, block_hash) ) ) = block_hash_rx. recv ( ) {
374
- println ! ( "fetch {block_height}:{block_hash}" ) ;
375
435
let moved_bitcoin_config = bitcoin_config. clone ( ) ;
376
- let block_data_tx = block_data_tx . clone ( ) ;
436
+ let block_data_tx = block_data_tx_moved . clone ( ) ;
377
437
let moved_ctx = moved_ctx. clone ( ) ;
378
438
retrieve_block_data_pool. execute ( move || {
439
+ moved_ctx. try_log ( |logger| slog:: info!( logger, "Fetching block #{block_height}" ) ) ;
379
440
let future = retrieve_full_block_breakdown_with_retry (
380
441
& moved_bitcoin_config,
381
442
& block_hash,
382
443
& moved_ctx,
383
444
) ;
384
445
let block_data = hiro_system_kit:: nestable_block_on ( future) . unwrap ( ) ;
385
- block_data_tx. send ( Some ( block_data) ) ;
446
+ let _ = block_data_tx. send ( Some ( block_data) ) ;
386
447
} ) ;
387
- retrieve_block_data_pool. join ( )
448
+ let res = retrieve_block_data_pool. join ( ) ;
449
+ res
388
450
}
389
- } ) ;
451
+ } ) . expect ( "unable to spawn thread" ) ;
390
452
391
- let handle = hiro_system_kit:: thread_named ( "Block data compression" ) . spawn ( move || {
453
+ let handle_2 = hiro_system_kit:: thread_named ( "Block data compression" ) . spawn ( move || {
392
454
while let Ok ( Some ( block_data) ) = block_data_rx. recv ( ) {
393
- println ! ( "store {}:{}" , block_data. height, block_data. hash) ;
394
- let block_compressed_tx = block_compressed_tx. clone ( ) ;
455
+ let block_compressed_tx_moved = block_compressed_tx. clone ( ) ;
395
456
compress_block_data_pool. execute ( move || {
396
457
let compressed_block = CompactedBlock :: from_full_block ( & block_data) ;
397
- block_compressed_tx. send ( Some ( ( block_data. height as u32 , compressed_block) ) ) ;
458
+ let _ = block_compressed_tx_moved
459
+ . send ( Some ( ( block_data. height as u32 , compressed_block) ) ) ;
398
460
} ) ;
399
461
400
- compress_block_data_pool. join ( )
462
+ let res = compress_block_data_pool. join ( ) ;
463
+ // let _ = block_compressed_tx.send(None);
464
+ res
401
465
}
402
- } ) ;
403
-
404
- let conn = initialize_ordinal_state_storage ( & db_file, & ctx) ;
466
+ } ) . expect ( "unable to spawn thread" ) ;
405
467
468
+ let mut blocks_stored = 0 ;
406
469
while let Ok ( Some ( ( block_height, compacted_block) ) ) = block_compressed_rx. recv ( ) {
407
- ctx. try_log ( |logger| slog:: debug!( logger, "Storing block #{block_height}" ) ) ;
408
-
409
- write_compacted_block_to_index ( block_height, & compacted_block, & conn, & ctx) ;
470
+ ctx. try_log ( |logger| slog:: info!( logger, "Storing block #{block_height}" ) ) ;
471
+ write_compacted_block_to_index ( block_height, & compacted_block, & storage_conn, & ctx) ;
472
+ blocks_stored+= 1 ;
473
+ if blocks_stored == end_block - start_block {
474
+ let _ = block_data_tx. send ( None ) ;
475
+ let _ = block_hash_tx. send ( None ) ;
476
+ ctx. try_log ( |logger| slog:: info!( logger, "Local ordinals storage successfully seeded with #{blocks_stored} blocks" ) ) ;
477
+ return Ok ( ( ) )
478
+ }
410
479
}
411
480
412
481
retrieve_block_hash_pool. join ( ) ;
@@ -429,7 +498,12 @@ pub fn retrieve_satoshi_point_using_local_storage(
429
498
let mut tx_cursor = ( txid, 0 ) ;
430
499
431
500
loop {
432
- let res = retrieve_compacted_block_from_index ( ordinal_block_number, & storage_conn) . unwrap ( ) ;
501
+ let res = match retrieve_compacted_block_from_index ( ordinal_block_number, & storage_conn) {
502
+ Some ( res) => res,
503
+ None => {
504
+ return Err ( format ! ( "unable to retrieve block ##{ordinal_block_number}" ) ) ;
505
+ }
506
+ } ;
433
507
434
508
ctx. try_log ( |logger| {
435
509
slog:: debug!(
@@ -527,11 +601,3 @@ pub fn retrieve_satoshi_point_using_local_storage(
527
601
Ok ( ( ordinal_block_number. into ( ) , ordinal_offset) )
528
602
}
529
603
530
- // pub async fn scan_bitcoin_chain_for_ordinal_inscriptions(
531
- // subscribers: Vec<HookAction>,
532
- // first_inscription_height: u64,
533
- // config: &Config,
534
- // ctx: &Context,
535
- // ) -> Result<(), String> {
536
- // Ok(())
537
- // }
0 commit comments