Skip to content

Commit 1922fd9

Browse files
author
Ludo Galabru
committed
feat: simplify + improve coordination
1 parent 0233dc5 commit 1922fd9

File tree

10 files changed

+154
-134
lines changed

10 files changed

+154
-134
lines changed

components/hord-cli/src/cli/mod.rs

+10-12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::config::generator::generate_config;
22
use crate::config::Config;
33
use crate::core::pipeline::download_and_pipeline_blocks;
4-
use crate::core::pipeline::processors::block_ingestion::start_block_ingestion_processor;
4+
use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor;
55
use crate::core::pipeline::processors::start_inscription_indexing_processor;
66
use crate::core::protocol::inscription_parsing::parse_ordinals_and_standardize_block;
77
use crate::download::download_ordinals_dataset_if_required;
@@ -12,8 +12,9 @@ use crate::db::{
1212
delete_data_in_hord_db, find_all_inscription_transfers, find_all_inscriptions_in_block,
1313
find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted,
1414
find_latest_inscription_block_height, find_lazy_block_at_block_height,
15-
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
16-
open_readwrite_hord_db_conn_rocks_db, initialize_hord_db, get_default_hord_db_file_path,
15+
get_default_hord_db_file_path, initialize_hord_db, open_readonly_hord_db_conn,
16+
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
17+
open_readwrite_hord_db_conn_rocks_db,
1718
};
1819
use chainhook_sdk::bitcoincore_rpc::{Auth, Client, RpcApi};
1920
use chainhook_sdk::chainhooks::types::HttpHook;
@@ -263,7 +264,7 @@ struct StartCommand {
263264
enum HordDbCommand {
264265
/// Initialize a new hord db
265266
#[clap(name = "new", bin_name = "new")]
266-
New(SyncHordDbCommand),
267+
New(SyncHordDbCommand),
267268
/// Catch-up hord db
268269
#[clap(name = "sync", bin_name = "sync")]
269270
Sync(SyncHordDbCommand),
@@ -482,8 +483,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
482483
let mut total_inscriptions = 0;
483484
let mut total_transfers = 0;
484485

485-
let inscriptions_db_conn =
486-
initialize_hord_db(&config.expected_cache_path(), &ctx);
486+
let inscriptions_db_conn = initialize_hord_db(&config.expected_cache_path(), &ctx);
487487
while let Some(block_height) = block_range.pop_front() {
488488
let inscriptions =
489489
find_all_inscriptions_in_block(&block_height, &inscriptions_db_conn, &ctx);
@@ -529,7 +529,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
529529
if total_transfers == 0 && total_inscriptions == 0 {
530530
let db_file_path = get_default_hord_db_file_path(&config.expected_cache_path());
531531
warn!(ctx.expect_logger(), "No data available. Check the validity of the range being scanned and the validity of your local database {}", db_file_path.display());
532-
}
532+
}
533533
}
534534
}
535535
Command::Scan(ScanCommand::Inscription(cmd)) => {
@@ -637,28 +637,27 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
637637
Command::Db(HordDbCommand::New(cmd)) => {
638638
let config = Config::default(false, false, false, &cmd.config_path)?;
639639
initialize_hord_db(&config.expected_cache_path(), &ctx);
640-
},
640+
}
641641
Command::Db(HordDbCommand::Sync(cmd)) => {
642642
let config = Config::default(false, false, false, &cmd.config_path)?;
643643
initialize_hord_db(&config.expected_cache_path(), &ctx);
644644
let service = Service::new(config, ctx.clone());
645645
service.update_state(None).await?;
646-
},
646+
}
647647
Command::Db(HordDbCommand::Repair(subcmd)) => match subcmd {
648648
RepairCommand::Blocks(cmd) => {
649649
let config = Config::default(false, false, false, &cmd.config_path)?;
650650
let mut hord_config = config.get_hord_config();
651651
hord_config.network_thread_max = cmd.network_threads;
652652

653-
let block_ingestion_processor = start_block_ingestion_processor(&config, ctx, None);
653+
let block_ingestion_processor = start_block_archiving_processor(&config, ctx, None);
654654

655655
download_and_pipeline_blocks(
656656
&config,
657657
cmd.start_block,
658658
cmd.end_block,
659659
hord_config.first_inscription_height,
660660
Some(&block_ingestion_processor),
661-
Some(&block_ingestion_processor),
662661
10_000,
663662
&ctx,
664663
)
@@ -677,7 +676,6 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
677676
cmd.start_block,
678677
cmd.end_block,
679678
hord_config.first_inscription_height,
680-
None,
681679
Some(&inscription_indexing_processor),
682680
10_000,
683681
&ctx,

components/hord-cli/src/config/mod.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,9 @@ impl Config {
155155
let bootstrap = match config_file.bootstrap {
156156
Some(bootstrap) => match bootstrap.download_url {
157157
Some(ref url) => BootstrapConfig::Download(url.to_string()),
158-
None => BootstrapConfig::Build
159-
}
160-
None => BootstrapConfig::Build
158+
None => BootstrapConfig::Build,
159+
},
160+
None => BootstrapConfig::Build,
161161
};
162162

163163
let config = Config {
@@ -243,7 +243,7 @@ impl Config {
243243
pub fn should_bootstrap_through_download(&self) -> bool {
244244
match &self.bootstrap {
245245
BootstrapConfig::Build => false,
246-
BootstrapConfig::Download(_) => true
246+
BootstrapConfig::Download(_) => true,
247247
}
248248
}
249249

@@ -267,7 +267,7 @@ impl Config {
267267
fn expected_remote_ordinals_sqlite_base_url(&self) -> &str {
268268
match &self.bootstrap {
269269
BootstrapConfig::Build => unreachable!(),
270-
BootstrapConfig::Download(url) => &url
270+
BootstrapConfig::Download(url) => &url,
271271
}
272272
}
273273

@@ -367,7 +367,9 @@ impl Config {
367367
working_dir: default_cache_path(),
368368
},
369369
http_api: PredicatesApi::Off,
370-
bootstrap: BootstrapConfig::Download(DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string()),
370+
bootstrap: BootstrapConfig::Download(
371+
DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string(),
372+
),
371373
limits: LimitsConfig {
372374
max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION,
373375
max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE,

components/hord-cli/src/core/pipeline/mod.rs

+74-54
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ pub enum PostProcessorCommand {
2525
}
2626

2727
pub enum PostProcessorEvent {
28-
EmptyQueue,
28+
Started,
29+
Terminated,
30+
Expired,
2931
}
3032

3133
pub struct PostProcessorController {
@@ -39,8 +41,7 @@ pub async fn download_and_pipeline_blocks(
3941
start_block: u64,
4042
end_block: u64,
4143
start_sequencing_blocks_at_height: u64,
42-
blocks_post_processor_pre_sequence: Option<&PostProcessorController>,
43-
blocks_post_processor_post_sequence: Option<&PostProcessorController>,
44+
blocks_post_processor: Option<&PostProcessorController>,
4445
speed: usize,
4546
ctx: &Context,
4647
) -> Result<(), String> {
@@ -101,7 +102,7 @@ pub async fn download_and_pipeline_blocks(
101102
rx_thread_pool.push(rx);
102103
}
103104

104-
for rx in rx_thread_pool.into_iter() {
105+
for (thread_index, rx) in rx_thread_pool.into_iter().enumerate() {
105106
let block_compressed_tx_moved = block_compressed_tx.clone();
106107
let moved_ctx: Context = moved_ctx.clone();
107108
let moved_bitcoin_network = moved_bitcoin_network.clone();
@@ -131,18 +132,16 @@ pub async fn download_and_pipeline_blocks(
131132
compressed_block,
132133
)));
133134
}
135+
moved_ctx
136+
.try_log(|logger| debug!(logger, "Exiting processing thread {thread_index}"));
134137
})
135138
.expect("unable to spawn thread");
136139
thread_pool_handles.push(handle);
137140
}
138141

139142
let cloned_ctx = ctx.clone();
140143

141-
let blocks_post_processor_post_sequence_commands_tx = blocks_post_processor_post_sequence
142-
.as_ref()
143-
.and_then(|p| Some(p.commands_tx.clone()));
144-
145-
let blocks_post_processor_pre_sequence_commands_tx = blocks_post_processor_pre_sequence
144+
let blocks_post_processor_commands_tx = blocks_post_processor
146145
.as_ref()
147146
.and_then(|p| Some(p.commands_tx.clone()));
148147

@@ -151,19 +150,52 @@ pub async fn download_and_pipeline_blocks(
151150
let mut inbox = HashMap::new();
152151
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
153152
let mut blocks_processed = 0;
154-
let mut pre_seq_processor_started = false;
155-
let mut post_seq_processor_started = false;
153+
let mut processor_started = false;
154+
let mut stop_runloop = false;
156155

157156
loop {
157+
if stop_runloop {
158+
cloned_ctx.try_log(|logger| {
159+
info!(
160+
logger,
161+
"#{blocks_processed} blocks successfully sent to processor"
162+
)
163+
});
164+
break;
165+
}
166+
158167
// Dequeue all the blocks available
159168
let mut new_blocks = vec![];
160-
while let Ok(Some((block_height, block, compacted_block))) =
161-
block_compressed_rx.try_recv()
162-
{
163-
blocks_processed += 1;
164-
new_blocks.push((block_height, block, compacted_block));
165-
if new_blocks.len() >= 10_000 {
166-
break;
169+
while let Ok(message) = block_compressed_rx.try_recv() {
170+
match message {
171+
Some((block_height, block, compacted_block)) => {
172+
blocks_processed += 1;
173+
new_blocks.push((block_height, block, compacted_block));
174+
// Max batch size: 10_000 blocks
175+
if new_blocks.len() >= 10_000 {
176+
break;
177+
}
178+
}
179+
None => {
180+
stop_runloop = true;
181+
}
182+
}
183+
}
184+
185+
if blocks_processed == number_of_blocks_to_process {
186+
stop_runloop = true;
187+
}
188+
189+
// Early "continue"
190+
if new_blocks.is_empty() {
191+
sleep(Duration::from_millis(500));
192+
continue;
193+
}
194+
195+
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
196+
if !processor_started {
197+
processor_started = true;
198+
let _ = blocks_tx.send(PostProcessorCommand::Start);
167199
}
168200
}
169201

@@ -176,18 +208,15 @@ pub async fn download_and_pipeline_blocks(
176208
}
177209
}
178210

179-
if !ooo_compacted_blocks.is_empty() {
180-
if let Some(ref blocks_tx) = blocks_post_processor_pre_sequence_commands_tx {
181-
if !pre_seq_processor_started {
182-
pre_seq_processor_started = true;
183-
let _ = blocks_tx.send(PostProcessorCommand::Start);
184-
}
185-
211+
// Early "continue"
212+
if inbox.is_empty() {
213+
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
186214
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(
187215
ooo_compacted_blocks,
188216
vec![],
189217
));
190218
}
219+
continue;
191220
}
192221

193222
// In order processing: construct the longest sequence of known blocks
@@ -199,24 +228,8 @@ pub async fn download_and_pipeline_blocks(
199228
inbox_cursor += 1;
200229
}
201230

202-
if blocks.is_empty() {
203-
if blocks_processed == number_of_blocks_to_process {
204-
cloned_ctx.try_log(|logger| {
205-
info!(
206-
logger,
207-
"#{blocks_processed} blocks successfully sent to processor"
208-
)
209-
});
210-
break;
211-
} else {
212-
sleep(Duration::from_secs(1));
213-
}
214-
} else {
215-
if let Some(ref blocks_tx) = blocks_post_processor_post_sequence_commands_tx {
216-
if !post_seq_processor_started {
217-
post_seq_processor_started = true;
218-
let _ = blocks_tx.send(PostProcessorCommand::Start);
219-
}
231+
if !blocks.is_empty() {
232+
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
220233
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(
221234
compacted_blocks,
222235
blocks,
@@ -248,7 +261,7 @@ pub async fn download_and_pipeline_blocks(
248261
}
249262

250263
ctx.try_log(|logger| {
251-
info!(
264+
debug!(
252265
logger,
253266
"Pipeline successfully fed with sequence of blocks ({} to {})", start_block, end_block
254267
)
@@ -258,26 +271,33 @@ pub async fn download_and_pipeline_blocks(
258271
let _ = tx.send(None);
259272
}
260273

274+
ctx.try_log(|logger| debug!(logger, "Enqueued pipeline termination commands"));
275+
261276
for handle in thread_pool_handles.into_iter() {
262277
let _ = handle.join();
263278
}
264279

265-
if let Some(post_processor) = blocks_post_processor_pre_sequence {
266-
loop {
267-
if let Ok(PostProcessorEvent::EmptyQueue) = post_processor.events_rx.recv() {
268-
break;
269-
}
270-
}
271-
}
280+
ctx.try_log(|logger| debug!(logger, "Pipeline successfully terminated"));
272281

273-
if let Some(post_processor) = blocks_post_processor_post_sequence {
282+
if let Some(post_processor) = blocks_post_processor {
283+
if let Ok(PostProcessorEvent::Started) = post_processor.events_rx.recv() {
284+
ctx.try_log(|logger| debug!(logger, "Block post processing started"));
285+
let _ = post_processor
286+
.commands_tx
287+
.send(PostProcessorCommand::Terminate);
288+
}
274289
loop {
275-
if let Ok(PostProcessorEvent::EmptyQueue) = post_processor.events_rx.recv() {
276-
break;
290+
if let Ok(signal) = post_processor.events_rx.recv() {
291+
match signal {
292+
PostProcessorEvent::Terminated | PostProcessorEvent::Expired => break,
293+
PostProcessorEvent::Started => unreachable!(),
294+
}
277295
}
278296
}
279297
}
280298

299+
let _ = block_compressed_tx.send(None);
300+
281301
let _ = storage_thread.join();
282302
let _ = set.shutdown();
283303

components/hord-cli/src/core/pipeline/processors/block_ingestion.rs renamed to components/hord-cli/src/core/pipeline/processors/block_archiving.rs

+13-6
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{
1414
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db, LazyBlock},
1515
};
1616

17-
pub fn start_block_ingestion_processor(
17+
pub fn start_block_archiving_processor(
1818
config: &Config,
1919
ctx: &Context,
2020
_post_processor: Option<Sender<BitcoinBlockData>>,
@@ -31,22 +31,29 @@ pub fn start_block_ingestion_processor(
3131
let mut empty_cycles = 0;
3232

3333
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
34-
info!(ctx.expect_logger(), "Start block indexing runloop");
34+
let _ = events_tx.send(PostProcessorEvent::Started);
35+
debug!(ctx.expect_logger(), "Start block indexing runloop");
3536
}
3637

3738
loop {
39+
debug!(ctx.expect_logger(), "Tick");
3840
let (compacted_blocks, _) = match commands_rx.try_recv() {
3941
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
4042
(compacted_blocks, blocks)
4143
}
42-
Ok(PostProcessorCommand::Terminate) => break,
43-
Ok(PostProcessorCommand::Start) => continue,
44+
Ok(PostProcessorCommand::Terminate) => {
45+
debug!(ctx.expect_logger(), "Terminating block processor");
46+
let _ = events_tx.send(PostProcessorEvent::Terminated);
47+
break;
48+
}
49+
Ok(PostProcessorCommand::Start) => unreachable!(),
4450
Err(e) => match e {
4551
TryRecvError::Empty => {
4652
empty_cycles += 1;
47-
4853
if empty_cycles == 30 {
49-
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
54+
warn!(ctx.expect_logger(), "Block processor reached expiration");
55+
let _ = events_tx.send(PostProcessorEvent::Expired);
56+
break;
5057
}
5158
sleep(Duration::from_secs(1));
5259
if empty_cycles > 120 {

0 commit comments

Comments
 (0)