1
1
use crate :: archive:: download_ordinals_dataset_if_required;
2
- use crate :: config:: Config ;
2
+ use crate :: config:: { Config , PredicatesApi } ;
3
+ use crate :: service:: {
4
+ open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus ,
5
+ ScanningData ,
6
+ } ;
3
7
use chainhook_sdk:: bitcoincore_rpc:: RpcApi ;
4
8
use chainhook_sdk:: bitcoincore_rpc:: { Auth , Client } ;
5
9
use chainhook_sdk:: chainhooks:: bitcoin:: {
@@ -8,10 +12,9 @@ use chainhook_sdk::chainhooks::bitcoin::{
8
12
} ;
9
13
use chainhook_sdk:: chainhooks:: types:: { BitcoinChainhookSpecification , BitcoinPredicateType } ;
10
14
use chainhook_sdk:: hord:: db:: {
11
- fetch_and_cache_blocks_in_hord_db, find_all_inscriptions, find_last_block_inserted,
12
- find_lazy_block_at_block_height, open_readonly_hord_db_conn,
13
- open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
14
- open_readwrite_hord_db_conn_rocks_db,
15
+ fetch_and_cache_blocks_in_hord_db, find_all_inscriptions_in_block, find_last_block_inserted,
16
+ find_lazy_block_at_block_height, get_any_entry_in_ordinal_activities,
17
+ open_readonly_hord_db_conn, open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
15
18
} ;
16
19
use chainhook_sdk:: hord:: {
17
20
get_inscriptions_revealed_in_block,
@@ -71,48 +74,27 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
71
74
72
75
// Are we dealing with an ordinals-based predicate?
73
76
// If so, we could use the ordinal storage to provide a set of hints.
74
- let mut inscriptions_cache = BTreeMap :: new ( ) ;
75
- let mut is_predicate_evaluating_ordinals = false ;
76
- let mut hord_blocks_requires_update = false ;
77
+ let mut inscriptions_db_conn = None ;
77
78
78
79
if let BitcoinPredicateType :: OrdinalsProtocol ( _) = & predicate_spec. predicate {
79
- is_predicate_evaluating_ordinals = true ;
80
- if let Ok ( inscriptions_db_conn) =
81
- open_readonly_hord_db_conn ( & config. expected_cache_path ( ) , & ctx)
82
- {
83
- inscriptions_cache = find_all_inscriptions ( & inscriptions_db_conn) ;
84
- // Will we have to update the blocks table?
85
- if let Ok ( blocks_db) =
86
- open_readonly_hord_db_conn_rocks_db ( & config. expected_cache_path ( ) , & ctx)
87
- {
88
- if find_lazy_block_at_block_height ( end_block as u32 , 3 , & blocks_db) . is_none ( ) {
89
- hord_blocks_requires_update = true ;
90
- }
91
- }
92
- }
93
- }
80
+ let blocks_db_rw =
81
+ open_readwrite_hord_db_conn_rocks_db ( & config. expected_cache_path ( ) , ctx) ?;
94
82
95
- // Do we need a seeded hord db?
96
- if is_predicate_evaluating_ordinals && inscriptions_cache. is_empty ( ) {
97
- // Do we need to update the blocks table first?
98
- if hord_blocks_requires_update {
83
+ if find_lazy_block_at_block_height ( end_block as u32 , 3 , & blocks_db_rw) . is_none ( ) {
99
84
// Count how many entries in the table
100
85
// Compute the right interval
101
86
// Start the build local storage routine
102
87
103
88
// TODO: make sure that we have a contiguous chain
104
89
// check_compacted_blocks_chain_integrity(&hord_db_conn);
105
90
106
- let blocks_db_rw =
107
- open_readwrite_hord_db_conn_rocks_db ( & config. expected_cache_path ( ) , ctx) ?;
108
-
109
91
let start_block = find_last_block_inserted ( & blocks_db_rw) as u64 ;
110
92
if start_block < end_block {
111
93
warn ! (
112
- ctx. expect_logger( ) ,
113
- "Database hord.sqlite appears to be outdated regarding the window of blocks provided. Syncing {} missing blocks" ,
114
- ( end_block - start_block)
115
- ) ;
94
+ ctx. expect_logger( ) ,
95
+ "Database hord.sqlite appears to be outdated regarding the window of blocks provided. Syncing {} missing blocks" ,
96
+ ( end_block - start_block)
97
+ ) ;
116
98
117
99
let inscriptions_db_conn_rw =
118
100
open_readwrite_hord_db_conn ( & config. expected_cache_path ( ) , ctx) ?;
@@ -126,10 +108,13 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
126
108
& ctx,
127
109
)
128
110
. await ?;
129
-
130
- inscriptions_cache = find_all_inscriptions ( & inscriptions_db_conn_rw) ;
131
111
}
132
112
}
113
+
114
+ inscriptions_db_conn = Some ( open_readonly_hord_db_conn (
115
+ & config. expected_cache_path ( ) ,
116
+ ctx,
117
+ ) ?) ;
133
118
}
134
119
135
120
info ! (
@@ -139,53 +124,60 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
139
124
140
125
let mut blocks_scanned = 0 ;
141
126
let mut actions_triggered = 0 ;
127
+ let mut occurrences_found = 0u64 ;
142
128
let mut err_count = 0 ;
143
129
144
130
let event_observer_config = config. get_event_observer_config ( ) ;
145
131
let bitcoin_config = event_observer_config. get_bitcoin_config ( ) ;
146
132
let mut traversals = HashMap :: new ( ) ;
147
- if is_predicate_evaluating_ordinals {
148
- let hord_db_conn = open_readonly_hord_db_conn ( & config. expected_cache_path ( ) , ctx) ?;
149
133
150
- let mut storage = Storage :: Memory ( BTreeMap :: new ( ) ) ;
151
- let mut cursor = start_block. saturating_sub ( 1 ) ;
152
- while cursor <= end_block {
153
- cursor += 1 ;
134
+ let mut storage = Storage :: Memory ( BTreeMap :: new ( ) ) ;
135
+ let mut cursor = start_block. saturating_sub ( 1 ) ;
136
+
137
+ while cursor <= end_block {
138
+ cursor += 1 ;
139
+ blocks_scanned += 1 ;
140
+
141
+ let block_hash = retrieve_block_hash_with_retry ( & cursor, & bitcoin_config, ctx) . await ?;
142
+ let block_breakdown =
143
+ download_and_parse_block_with_retry ( & block_hash, & bitcoin_config, ctx) . await ?;
144
+ let mut block = match indexer:: bitcoin:: standardize_bitcoin_block (
145
+ block_breakdown,
146
+ & event_observer_config. bitcoin_network ,
147
+ ctx,
148
+ ) {
149
+ Ok ( data) => data,
150
+ Err ( e) => {
151
+ warn ! (
152
+ ctx. expect_logger( ) ,
153
+ "Unable to standardize block#{} {}: {}" , cursor, block_hash, e
154
+ ) ;
155
+ continue ;
156
+ }
157
+ } ;
158
+
159
+ if let Some ( ref inscriptions_db_conn) = inscriptions_db_conn {
160
+ if !get_any_entry_in_ordinal_activities ( & cursor, & inscriptions_db_conn, & ctx) { }
154
161
155
162
// Evaluating every single block is required for also keeping track of transfers.
156
- let local_traverals = match inscriptions_cache. remove ( & cursor) {
163
+ let local_traverals = match find_all_inscriptions_in_block (
164
+ & cursor,
165
+ & inscriptions_db_conn,
166
+ )
167
+ . remove ( & cursor)
168
+ {
157
169
Some ( entry) => entry,
158
170
None => vec ! [ ] ,
159
171
} ;
160
172
for ( transaction_identifier, traversal_result) in local_traverals. into_iter ( ) {
161
173
traversals. insert ( transaction_identifier, traversal_result) ;
162
174
}
163
175
164
- blocks_scanned += 1 ;
165
-
166
- let block_hash = retrieve_block_hash_with_retry ( & cursor, & bitcoin_config, ctx) . await ?;
167
- let block_breakdown =
168
- download_and_parse_block_with_retry ( & block_hash, & bitcoin_config, ctx) . await ?;
169
- let mut block = match indexer:: bitcoin:: standardize_bitcoin_block (
170
- block_breakdown,
171
- & event_observer_config. bitcoin_network ,
172
- ctx,
173
- ) {
174
- Ok ( data) => data,
175
- Err ( e) => {
176
- warn ! (
177
- ctx. expect_logger( ) ,
178
- "Unable to standardize block#{} {}: {}" , cursor, block_hash, e
179
- ) ;
180
- continue ;
181
- }
182
- } ;
183
-
184
176
let _ = update_storage_and_augment_bitcoin_block_with_inscription_reveal_data (
185
177
& mut block,
186
178
& mut storage,
187
179
& traversals,
188
- & hord_db_conn ,
180
+ & inscriptions_db_conn ,
189
181
& ctx,
190
182
) ?;
191
183
@@ -200,116 +192,78 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
200
192
. map ( |d| d. inscription_number . to_string ( ) )
201
193
. collect :: < Vec < String > > ( ) ;
202
194
203
- let chain_event =
204
- BitcoinChainEvent :: ChainUpdatedWithBlocks ( BitcoinChainUpdatedWithBlocksData {
205
- new_blocks : vec ! [ block] ,
206
- confirmed_blocks : vec ! [ ] ,
207
- } ) ;
208
-
209
- let ( predicates_triggered, _predicates_evaluated) =
210
- evaluate_bitcoin_chainhooks_on_chain_event (
211
- & chain_event,
212
- vec ! [ & predicate_spec] ,
213
- ctx,
214
- ) ;
215
-
216
195
info ! (
217
196
ctx. expect_logger( ) ,
218
197
"Processing block #{} through {} predicate (inscriptions revealed: [{}])" ,
219
198
cursor,
220
199
predicate_spec. uuid,
221
200
inscriptions_revealed. join( ", " )
222
201
) ;
202
+ }
223
203
224
- match execute_predicates_action ( predicates_triggered, & event_observer_config, & ctx)
225
- . await
226
- {
227
- Ok ( actions) => actions_triggered += actions,
228
- Err ( _) => err_count += 1 ,
229
- }
204
+ let chain_event =
205
+ BitcoinChainEvent :: ChainUpdatedWithBlocks ( BitcoinChainUpdatedWithBlocksData {
206
+ new_blocks : vec ! [ block] ,
207
+ confirmed_blocks : vec ! [ ] ,
208
+ } ) ;
230
209
231
- if err_count >= 3 {
232
- return Err ( format ! ( "Scan aborted (consecutive action errors >= 3)" ) ) ;
233
- }
210
+ let ( predicates_triggered , _predicates_evaluated ) =
211
+ evaluate_bitcoin_chainhooks_on_chain_event ( & chain_event , vec ! [ & predicate_spec ] , ctx ) ;
212
+ occurrences_found += predicates_triggered . len ( ) as u64 ;
234
213
235
- if cursor == end_block && floating_end_block {
236
- end_block = match bitcoin_rpc. get_blockchain_info ( ) {
237
- Ok ( result) => result. blocks - 1 ,
238
- Err ( _e) => {
239
- continue ;
240
- }
241
- } ;
214
+ if let PredicatesApi :: On ( ref api_config) = config. http_api {
215
+ if blocks_scanned % 50 == 0 {
216
+ let status = PredicateStatus :: Scanning ( ScanningData {
217
+ start_block,
218
+ end_block,
219
+ cursor,
220
+ occurrences_found,
221
+ } ) ;
222
+ let mut predicates_db_conn =
223
+ open_readwrite_predicates_db_conn_or_panic ( api_config, & ctx) ;
224
+ update_predicate_status (
225
+ & predicate_spec. key ( ) ,
226
+ status,
227
+ & mut predicates_db_conn,
228
+ & ctx,
229
+ )
242
230
}
243
231
}
244
- } else {
245
- let use_scan_to_seed_hord_db = true ;
246
232
247
- if use_scan_to_seed_hord_db {
248
- // Start ingestion pipeline
233
+ match execute_predicates_action ( predicates_triggered, & event_observer_config, & ctx) . await {
234
+ Ok ( actions) => actions_triggered += actions,
235
+ Err ( _) => err_count += 1 ,
249
236
}
250
237
251
- let mut cursor = start_block. saturating_sub ( 1 ) ;
252
- while cursor <= end_block {
253
- cursor += 1 ;
254
- blocks_scanned += 1 ;
255
- let block_hash = retrieve_block_hash_with_retry ( & cursor, & bitcoin_config, ctx) . await ?;
256
- let block_breakdown =
257
- download_and_parse_block_with_retry ( & block_hash, & bitcoin_config, ctx) . await ?;
258
-
259
- let block = match indexer:: bitcoin:: standardize_bitcoin_block (
260
- block_breakdown,
261
- & event_observer_config. bitcoin_network ,
262
- ctx,
263
- ) {
264
- Ok ( data) => data,
265
- Err ( e) => {
266
- warn ! (
267
- ctx. expect_logger( ) ,
268
- "Unable to standardize block#{} {}: {}" , cursor, block_hash, e
269
- ) ;
238
+ if err_count >= 3 {
239
+ return Err ( format ! ( "Scan aborted (consecutive action errors >= 3)" ) ) ;
240
+ }
241
+
242
+ if cursor == end_block && floating_end_block {
243
+ end_block = match bitcoin_rpc. get_blockchain_info ( ) {
244
+ Ok ( result) => result. blocks - 1 ,
245
+ Err ( _e) => {
270
246
continue ;
271
247
}
272
248
} ;
273
-
274
- let chain_event =
275
- BitcoinChainEvent :: ChainUpdatedWithBlocks ( BitcoinChainUpdatedWithBlocksData {
276
- new_blocks : vec ! [ block] ,
277
- confirmed_blocks : vec ! [ ] ,
278
- } ) ;
279
-
280
- let ( predicates_triggered, _predicates_evaluated) =
281
- evaluate_bitcoin_chainhooks_on_chain_event (
282
- & chain_event,
283
- vec ! [ & predicate_spec] ,
284
- ctx,
285
- ) ;
286
-
287
- match execute_predicates_action ( predicates_triggered, & event_observer_config, & ctx)
288
- . await
289
- {
290
- Ok ( actions) => actions_triggered += actions,
291
- Err ( _) => err_count += 1 ,
292
- }
293
-
294
- if err_count >= 3 {
295
- return Err ( format ! ( "Scan aborted (consecutive action errors >= 3)" ) ) ;
296
- }
297
-
298
- if cursor == end_block && floating_end_block {
299
- end_block = match bitcoin_rpc. get_blockchain_info ( ) {
300
- Ok ( result) => result. blocks - 1 ,
301
- Err ( _e) => {
302
- continue ;
303
- }
304
- } ;
305
- }
306
249
}
307
250
}
308
251
info ! (
309
252
ctx. expect_logger( ) ,
310
253
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
311
254
) ;
312
255
256
+ if let PredicatesApi :: On ( ref api_config) = config. http_api {
257
+ let status = PredicateStatus :: Scanning ( ScanningData {
258
+ start_block,
259
+ end_block,
260
+ cursor,
261
+ occurrences_found,
262
+ } ) ;
263
+ let mut predicates_db_conn = open_readwrite_predicates_db_conn_or_panic ( api_config, & ctx) ;
264
+ update_predicate_status ( & predicate_spec. key ( ) , status, & mut predicates_db_conn, & ctx)
265
+ }
266
+
313
267
Ok ( ( ) )
314
268
}
315
269
0 commit comments