@@ -59,193 +59,197 @@ pub fn parallelize_inscription_data_computations(
59
59
60
60
let has_transactions_to_process = !transactions_ids. is_empty ( ) || !l1_cache_hits. is_empty ( ) ;
61
61
62
- let thread_max = hord_config. ingestion_thread_max * 2 ;
62
+ let thread_max = hord_config. ingestion_thread_max ;
63
63
64
+ // Nothing to do? early return
64
65
if has_transactions_to_process {
65
- let expected_traversals = transactions_ids. len ( ) + l1_cache_hits. len ( ) ;
66
- let ( traversal_tx, traversal_rx) = channel ( ) ;
67
-
68
- let mut tx_thread_pool = vec ! [ ] ;
69
- let mut thread_pool_handles = vec ! [ ] ;
70
-
71
- for thread_index in 0 ..thread_max {
72
- let ( tx, rx) = channel ( ) ;
73
- tx_thread_pool. push ( tx) ;
74
-
75
- let moved_traversal_tx = traversal_tx. clone ( ) ;
76
- let moved_ctx = inner_ctx. clone ( ) ;
77
- let moved_hord_db_path = hord_config. db_path . clone ( ) ;
78
- let local_cache = cache_l2. clone ( ) ;
79
-
80
- let handle = hiro_system_kit:: thread_named ( "Worker" )
81
- . spawn ( move || {
82
- while let Ok ( Some ( (
83
- transaction_id,
84
- block_identifier,
66
+ return Ok ( false )
67
+ }
68
+
69
+ let expected_traversals = transactions_ids. len ( ) + l1_cache_hits. len ( ) ;
70
+ let ( traversal_tx, traversal_rx) = channel ( ) ;
71
+
72
+ let mut tx_thread_pool = vec ! [ ] ;
73
+ let mut thread_pool_handles = vec ! [ ] ;
74
+
75
+ for thread_index in 0 ..thread_max {
76
+ let ( tx, rx) = channel ( ) ;
77
+ tx_thread_pool. push ( tx) ;
78
+
79
+ let moved_traversal_tx = traversal_tx. clone ( ) ;
80
+ let moved_ctx = inner_ctx. clone ( ) ;
81
+ let moved_hord_db_path = hord_config. db_path . clone ( ) ;
82
+ let local_cache = cache_l2. clone ( ) ;
83
+
84
+ let handle = hiro_system_kit:: thread_named ( "Worker" )
85
+ . spawn ( move || {
86
+ while let Ok ( Some ( (
87
+ transaction_id,
88
+ block_identifier,
89
+ input_index,
90
+ prioritary,
91
+ ) ) ) = rx. recv ( )
92
+ {
93
+ let traversal: Result < TraversalResult , String > = compute_satoshi_number (
94
+ & moved_hord_db_path,
95
+ & block_identifier,
96
+ & transaction_id,
85
97
input_index,
86
- prioritary,
87
- ) ) ) = rx. recv ( )
88
- {
89
- let traversal: Result < TraversalResult , String > = compute_satoshi_number (
90
- & moved_hord_db_path,
91
- & block_identifier,
92
- & transaction_id,
93
- input_index,
94
- 0 ,
95
- & local_cache,
96
- & moved_ctx,
97
- ) ;
98
- let _ = moved_traversal_tx. send ( ( traversal, prioritary, thread_index) ) ;
99
- }
100
- } )
101
- . expect ( "unable to spawn thread" ) ;
102
- thread_pool_handles. push ( handle) ;
103
- }
98
+ 0 ,
99
+ & local_cache,
100
+ & moved_ctx,
101
+ ) ;
102
+ let _ = moved_traversal_tx. send ( ( traversal, prioritary, thread_index) ) ;
103
+ }
104
+ } )
105
+ . expect ( "unable to spawn thread" ) ;
106
+ thread_pool_handles. push ( handle) ;
107
+ }
104
108
105
- // Empty cache
106
- let mut thread_index = 0 ;
107
- for key in l1_cache_hits. iter ( ) {
108
- if let Some ( entry) = cache_l1. remove ( key) {
109
- let _ = traversal_tx. send ( ( Ok ( entry) , true , thread_index) ) ;
110
- thread_index = ( thread_index + 1 ) % thread_max;
111
- }
109
+ // Empty cache
110
+ let mut thread_index = 0 ;
111
+ for key in l1_cache_hits. iter ( ) {
112
+ if let Some ( entry) = cache_l1. remove ( key) {
113
+ let _ = traversal_tx. send ( ( Ok ( entry) , true , thread_index) ) ;
114
+ thread_index = ( thread_index + 1 ) % thread_max;
112
115
}
116
+ }
113
117
114
- ctx. try_log ( |logger| {
115
- info ! (
116
- logger,
117
- "Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})" ,
118
- block. block_identifier. index,
119
- transactions_ids. len( ) ,
120
- l1_cache_hits. len( ) ,
121
- next_blocks. len( ) ,
122
- cache_l1. len( ) ,
123
- cache_l2. len( ) ,
124
- )
125
- } ) ;
118
+ ctx. try_log ( |logger| {
119
+ info ! (
120
+ logger,
121
+ "Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})" ,
122
+ block. block_identifier. index,
123
+ transactions_ids. len( ) ,
124
+ l1_cache_hits. len( ) ,
125
+ next_blocks. len( ) ,
126
+ cache_l1. len( ) ,
127
+ cache_l2. len( ) ,
128
+ )
129
+ } ) ;
130
+
131
+ let mut rng = thread_rng ( ) ;
132
+ transactions_ids. shuffle ( & mut rng) ;
133
+ let mut priority_queue = VecDeque :: new ( ) ;
134
+ let mut warmup_queue = VecDeque :: new ( ) ;
135
+
136
+ for ( transaction_id, input_index) in transactions_ids. into_iter ( ) {
137
+ priority_queue. push_back ( (
138
+ transaction_id,
139
+ block. block_identifier . clone ( ) ,
140
+ input_index,
141
+ true ,
142
+ ) ) ;
143
+ }
126
144
127
- let mut rng = thread_rng ( ) ;
128
- transactions_ids. shuffle ( & mut rng) ;
129
- let mut priority_queue = VecDeque :: new ( ) ;
130
- let mut warmup_queue = VecDeque :: new ( ) ;
131
-
132
- for ( transaction_id, input_index) in transactions_ids. into_iter ( ) {
133
- priority_queue. push_back ( (
134
- transaction_id,
135
- block. block_identifier . clone ( ) ,
136
- input_index,
137
- true ,
138
- ) ) ;
139
- }
145
+ // Feed each workers with 2 workitems each
146
+ for thread_index in 0 ..thread_max {
147
+ let _ = tx_thread_pool[ thread_index] . send ( priority_queue. pop_front ( ) ) ;
148
+ }
149
+ for thread_index in 0 ..thread_max {
150
+ let _ = tx_thread_pool[ thread_index] . send ( priority_queue. pop_front ( ) ) ;
151
+ }
140
152
141
- // Feed each workers with 2 workitems each
142
- for thread_index in 0 ..thread_max {
143
- let _ = tx_thread_pool[ thread_index] . send ( priority_queue. pop_front ( ) ) ;
153
+ let mut next_block_iter = next_blocks. iter ( ) ;
154
+ let mut traversals_received = 0 ;
155
+ while let Ok ( ( traversal_result, prioritary, thread_index) ) = traversal_rx. recv ( ) {
156
+ if prioritary {
157
+ traversals_received += 1 ;
144
158
}
145
- for thread_index in 0 ..thread_max {
146
- let _ = tx_thread_pool[ thread_index] . send ( priority_queue. pop_front ( ) ) ;
159
+ match traversal_result {
160
+ Ok ( traversal) => {
161
+ inner_ctx. try_log ( |logger| {
162
+ info ! (
163
+ logger,
164
+ "Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index})." ,
165
+ traversal. ordinal_number, traversal. get_ordinal_coinbase_height( ) , traversal. get_ordinal_coinbase_offset( ) , traversal. transfers
166
+ )
167
+ } ) ;
168
+ cache_l1. insert (
169
+ (
170
+ traversal. transaction_identifier_inscription . clone ( ) ,
171
+ traversal. inscription_input_index ,
172
+ ) ,
173
+ traversal,
174
+ ) ;
175
+ }
176
+ Err ( e) => {
177
+ ctx. try_log ( |logger| {
178
+ error ! ( logger, "Unable to compute inscription's Satoshi: {e}" , )
179
+ } ) ;
180
+ }
181
+ }
182
+ if traversals_received == expected_traversals {
183
+ break ;
147
184
}
148
185
149
- let mut next_block_iter = next_blocks. iter ( ) ;
150
- let mut traversals_received = 0 ;
151
- while let Ok ( ( traversal_result, prioritary, thread_index) ) = traversal_rx. recv ( ) {
152
- if prioritary {
153
- traversals_received += 1 ;
154
- }
155
- match traversal_result {
156
- Ok ( traversal) => {
157
- inner_ctx. try_log ( |logger| {
158
- info ! (
159
- logger,
160
- "Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index})." ,
161
- traversal. ordinal_number, traversal. get_ordinal_coinbase_height( ) , traversal. get_ordinal_coinbase_offset( ) , traversal. transfers
162
- )
163
- } ) ;
164
- cache_l1. insert (
165
- (
166
- traversal. transaction_identifier_inscription . clone ( ) ,
167
- traversal. inscription_input_index ,
168
- ) ,
169
- traversal,
186
+ if let Some ( w) = priority_queue. pop_front ( ) {
187
+ let _ = tx_thread_pool[ thread_index] . send ( Some ( w) ) ;
188
+ } else {
189
+ if let Some ( w) = warmup_queue. pop_front ( ) {
190
+ let _ = tx_thread_pool[ thread_index] . send ( Some ( w) ) ;
191
+ } else {
192
+ if let Some ( next_block) = next_block_iter. next ( ) {
193
+ let ( mut transactions_ids, _) = get_transactions_to_process (
194
+ next_block,
195
+ cache_l1,
196
+ inscriptions_db_tx,
197
+ ctx,
170
198
) ;
171
- }
172
- Err ( e) => {
199
+
173
200
ctx. try_log ( |logger| {
174
- error ! ( logger, "Unable to compute inscription's Satoshi: {e}" , )
201
+ info ! (
202
+ logger,
203
+ "Number of inscriptions in block #{} to pre-process: {}" ,
204
+ block. block_identifier. index,
205
+ transactions_ids. len( )
206
+ )
175
207
} ) ;
176
- }
177
- }
178
- if traversals_received == expected_traversals {
179
- break ;
180
- }
181
208
182
- if let Some ( w) = priority_queue. pop_front ( ) {
183
- let _ = tx_thread_pool[ thread_index] . send ( Some ( w) ) ;
184
- } else {
185
- if let Some ( w) = warmup_queue. pop_front ( ) {
186
- let _ = tx_thread_pool[ thread_index] . send ( Some ( w) ) ;
187
- } else {
188
- if let Some ( next_block) = next_block_iter. next ( ) {
189
- let ( mut transactions_ids, _) = get_transactions_to_process (
190
- next_block,
191
- cache_l1,
192
- inscriptions_db_tx,
193
- ctx,
194
- ) ;
195
-
196
- ctx. try_log ( |logger| {
197
- info ! (
198
- logger,
199
- "Number of inscriptions in block #{} to pre-process: {}" ,
200
- block. block_identifier. index,
201
- transactions_ids. len( )
202
- )
203
- } ) ;
204
-
205
- transactions_ids. shuffle ( & mut rng) ;
206
- for ( transaction_id, input_index) in transactions_ids. into_iter ( ) {
207
- warmup_queue. push_back ( (
208
- transaction_id,
209
- next_block. block_identifier . clone ( ) ,
210
- input_index,
211
- false ,
212
- ) ) ;
213
- }
214
- let _ = tx_thread_pool[ thread_index] . send ( warmup_queue. pop_front ( ) ) ;
209
+ transactions_ids. shuffle ( & mut rng) ;
210
+ for ( transaction_id, input_index) in transactions_ids. into_iter ( ) {
211
+ warmup_queue. push_back ( (
212
+ transaction_id,
213
+ next_block. block_identifier . clone ( ) ,
214
+ input_index,
215
+ false ,
216
+ ) ) ;
215
217
}
218
+ let _ = tx_thread_pool[ thread_index] . send ( warmup_queue. pop_front ( ) ) ;
216
219
}
217
220
}
218
221
}
222
+ }
219
223
220
- for tx in tx_thread_pool. iter ( ) {
221
- // Empty the queue
222
- if let Ok ( ( traversal_result, prioritary, thread_index) ) = traversal_rx. try_recv ( ) {
223
- if let Ok ( traversal) = traversal_result {
224
- inner_ctx. try_log ( |logger| {
225
- info ! (
226
- logger,
227
- "Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index})." ,
228
- traversal. ordinal_number, traversal. get_ordinal_coinbase_height( ) , traversal. get_ordinal_coinbase_offset( ) , traversal. transfers
229
- )
230
- } ) ;
231
- cache_l1. insert (
232
- (
233
- traversal. transaction_identifier_inscription . clone ( ) ,
234
- traversal. inscription_input_index ,
235
- ) ,
236
- traversal,
237
- ) ;
238
- }
224
+ for tx in tx_thread_pool. iter ( ) {
225
+ // Empty the queue
226
+ if let Ok ( ( traversal_result, prioritary, thread_index) ) = traversal_rx. try_recv ( ) {
227
+ if let Ok ( traversal) = traversal_result {
228
+ inner_ctx. try_log ( |logger| {
229
+ info ! (
230
+ logger,
231
+ "Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index})." ,
232
+ traversal. ordinal_number, traversal. get_ordinal_coinbase_height( ) , traversal. get_ordinal_coinbase_offset( ) , traversal. transfers
233
+ )
234
+ } ) ;
235
+ cache_l1. insert (
236
+ (
237
+ traversal. transaction_identifier_inscription . clone ( ) ,
238
+ traversal. inscription_input_index ,
239
+ ) ,
240
+ traversal,
241
+ ) ;
239
242
}
240
- let _ = tx. send ( None ) ;
241
243
}
242
-
243
- let _ = hiro_system_kit:: thread_named ( "Garbage collection" ) . spawn ( move || {
244
- for handle in thread_pool_handles. into_iter ( ) {
245
- let _ = handle. join ( ) ;
246
- }
247
- } ) ;
244
+ let _ = tx. send ( None ) ;
248
245
}
246
+
247
+ let _ = hiro_system_kit:: thread_named ( "Garbage collection" ) . spawn ( move || {
248
+ for handle in thread_pool_handles. into_iter ( ) {
249
+ let _ = handle. join ( ) ;
250
+ }
251
+ } ) ;
252
+
249
253
Ok ( has_transactions_to_process)
250
254
}
251
255
0 commit comments