Skip to content

Commit 13421db

Browse files
author
Ludo Galabru
committed
feat: streamline processors
1 parent c0991c5 commit 13421db

File tree

6 files changed

+193
-30
lines changed

6 files changed

+193
-30
lines changed

components/hord-cli/src/cli/mod.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use crate::archive::download_ordinals_dataset_if_required;
22
use crate::config::generator::generate_config;
33
use crate::config::Config;
44
use crate::core::pipeline::download_and_pipeline_blocks;
5-
use crate::core::pipeline::processors::start_ordinals_number_processor;
5+
use crate::core::pipeline::processors::block_ingestion::start_block_ingestion_processor;
6+
use crate::core::pipeline::processors::start_inscription_indexing_processor;
67
use crate::core::{self};
78
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
89
use crate::service::Service;
@@ -622,12 +623,14 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
622623
let mut hord_config = config.get_hord_config();
623624
hord_config.network_thread_max = cmd.network_threads;
624625

626+
let blocks_post_processor = start_block_ingestion_processor(&config, ctx, None);
627+
625628
download_and_pipeline_blocks(
626629
&config,
627630
cmd.start_block,
628631
cmd.end_block,
629632
hord_config.first_inscription_height,
630-
None,
633+
Some(&blocks_post_processor),
631634
&ctx,
632635
)
633636
.await?
@@ -637,19 +640,18 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
637640
let mut hord_config = config.get_hord_config();
638641
hord_config.network_thread_max = cmd.network_threads;
639642

640-
let (tx, handle) = start_ordinals_number_processor(&config, ctx, None);
643+
let blocks_post_processor =
644+
start_inscription_indexing_processor(&config, ctx, None);
641645

642646
download_and_pipeline_blocks(
643647
&config,
644648
cmd.start_block,
645649
cmd.end_block,
646650
hord_config.first_inscription_height,
647-
Some(tx),
651+
Some(&blocks_post_processor),
648652
&ctx,
649653
)
650654
.await?;
651-
652-
let _ = handle.join();
653655
}
654656
RepairCommand::Transfers(cmd) => {
655657
let config = Config::default(false, false, false, &cmd.config_path)?;

components/hord-cli/src/core/pipeline/mod.rs

+31-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use chainhook_sdk::types::BitcoinBlockData;
55
use chainhook_sdk::utils::Context;
66
use crossbeam_channel::bounded;
77
use std::collections::{HashMap, VecDeque};
8-
use std::thread::sleep;
8+
use std::thread::{sleep, JoinHandle};
99
use std::time::Duration;
1010
use tokio::task::JoinSet;
1111

@@ -18,12 +18,27 @@ use chainhook_sdk::indexer::bitcoin::{
1818

1919
use super::parse_ordinals_and_standardize_block;
2020

21+
pub enum PostProcessorCommand {
22+
ProcessBlocks(Vec<(BitcoinBlockData, LazyBlock)>),
23+
Terminate,
24+
}
25+
26+
pub enum PostProcessorEvent {
27+
EmptyQueue,
28+
}
29+
30+
pub struct PostProcessorController {
31+
pub commands_tx: crossbeam_channel::Sender<PostProcessorCommand>,
32+
pub events_rx: crossbeam_channel::Receiver<PostProcessorEvent>,
33+
pub thread_handle: JoinHandle<()>,
34+
}
35+
2136
pub async fn download_and_pipeline_blocks(
2237
config: &Config,
2338
start_block: u64,
2439
end_block: u64,
2540
start_sequencing_blocks_at_height: u64,
26-
blocks_post_processor: Option<crossbeam_channel::Sender<Vec<(BitcoinBlockData, LazyBlock)>>>,
41+
blocks_post_processor: Option<&PostProcessorController>,
2742
ctx: &Context,
2843
) -> Result<(), String> {
2944
// let guard = pprof::ProfilerGuardBuilder::default()
@@ -111,6 +126,10 @@ pub async fn download_and_pipeline_blocks(
111126

112127
let cloned_ctx = ctx.clone();
113128

129+
let post_processor_commands_tx = blocks_post_processor
130+
.as_ref()
131+
.and_then(|p| Some(p.commands_tx.clone()));
132+
114133
let storage_thread = hiro_system_kit::thread_named("Ordered blocks dispatcher")
115134
.spawn(move || {
116135
let mut inbox = HashMap::new();
@@ -151,8 +170,8 @@ pub async fn download_and_pipeline_blocks(
151170
inbox_cursor += 1;
152171
}
153172
if !chunk.is_empty() {
154-
if let Some(ref tx) = blocks_post_processor {
155-
let _ = tx.send(chunk);
173+
if let Some(ref blocks_tx) = post_processor_commands_tx {
174+
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(chunk));
156175
}
157176
} else {
158177
if blocks_processed == number_of_blocks_to_process {
@@ -200,6 +219,14 @@ pub async fn download_and_pipeline_blocks(
200219
let _ = handle.join();
201220
}
202221

222+
if let Some(post_processor) = blocks_post_processor {
223+
loop {
224+
if let Ok(PostProcessorEvent::EmptyQueue) = post_processor.events_rx.recv() {
225+
break;
226+
}
227+
}
228+
}
229+
203230
let _ = storage_thread.join();
204231
let _ = set.shutdown();
205232

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use std::{
2+
sync::mpsc::Sender,
3+
thread::{sleep, JoinHandle},
4+
time::Duration,
5+
};
6+
7+
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
8+
use crossbeam_channel::TryRecvError;
9+
10+
use crate::{
11+
config::Config,
12+
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
13+
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db},
14+
};
15+
16+
pub fn start_block_ingestion_processor(
17+
config: &Config,
18+
ctx: &Context,
19+
_post_processor: Option<Sender<BitcoinBlockData>>,
20+
) -> PostProcessorController {
21+
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
22+
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();
23+
24+
let config = config.clone();
25+
let ctx = ctx.clone();
26+
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
27+
.spawn(move || {
28+
let mut num_writes = 0;
29+
let blocks_db_rw =
30+
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();
31+
32+
let mut empty_cycles = 0;
33+
34+
loop {
35+
let blocks = match commands_rx.try_recv() {
36+
Ok(PostProcessorCommand::ProcessBlocks(blocks)) => blocks,
37+
Ok(PostProcessorCommand::Terminate) => break,
38+
Err(e) => match e {
39+
TryRecvError::Empty => {
40+
empty_cycles += 1;
41+
42+
if empty_cycles == 30 {
43+
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
44+
}
45+
sleep(Duration::from_secs(1));
46+
if empty_cycles > 120 {
47+
break;
48+
}
49+
continue;
50+
}
51+
_ => {
52+
break;
53+
}
54+
},
55+
};
56+
57+
info!(ctx.expect_logger(), "Storing {} blocks", blocks.len());
58+
59+
for (block, compacted_block) in blocks.into_iter() {
60+
insert_entry_in_blocks(
61+
block.block_identifier.index as u32,
62+
&compacted_block,
63+
&blocks_db_rw,
64+
&ctx,
65+
);
66+
num_writes += 1;
67+
}
68+
69+
// Early return
70+
if num_writes % 128 == 0 {
71+
ctx.try_log(|logger| {
72+
info!(logger, "Flushing DB to disk ({num_writes} inserts)");
73+
});
74+
if let Err(e) = blocks_db_rw.flush() {
75+
ctx.try_log(|logger| {
76+
error!(logger, "{}", e.to_string());
77+
});
78+
}
79+
num_writes = 0;
80+
continue;
81+
}
82+
83+
// Write blocks to disk, before traversals
84+
if let Err(e) = blocks_db_rw.flush() {
85+
ctx.try_log(|logger| {
86+
error!(logger, "{}", e.to_string());
87+
});
88+
}
89+
}
90+
91+
if let Err(e) = blocks_db_rw.flush() {
92+
ctx.try_log(|logger| {
93+
error!(logger, "{}", e.to_string());
94+
});
95+
}
96+
})
97+
.expect("unable to spawn thread");
98+
99+
PostProcessorController {
100+
commands_tx,
101+
events_rx,
102+
thread_handle: handle,
103+
}
104+
}

components/hord-cli/src/core/pipeline/processors/inscription_indexing.rs

+44-13
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,32 @@
11
use std::{
22
sync::{mpsc::Sender, Arc},
3-
thread::JoinHandle,
3+
thread::{sleep, JoinHandle},
4+
time::Duration,
45
};
56

67
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
8+
use crossbeam_channel::TryRecvError;
79

810
use crate::{
911
config::Config,
10-
core::{new_traversals_lazy_cache, protocol::sequencing::process_blocks},
12+
core::{
13+
new_traversals_lazy_cache,
14+
pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
15+
protocol::sequencing::process_blocks,
16+
},
1117
db::{
1218
insert_entry_in_blocks, open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
13-
InscriptionHeigthHint, LazyBlock,
19+
InscriptionHeigthHint,
1420
},
1521
};
1622

17-
pub fn start_ordinals_number_processor(
23+
pub fn start_inscription_indexing_processor(
1824
config: &Config,
1925
ctx: &Context,
2026
post_processor: Option<Sender<BitcoinBlockData>>,
21-
) -> (
22-
crossbeam_channel::Sender<Vec<(BitcoinBlockData, LazyBlock)>>,
23-
JoinHandle<()>,
24-
) {
25-
let (tx, rx) = crossbeam_channel::bounded::<Vec<(BitcoinBlockData, LazyBlock)>>(1);
27+
) -> PostProcessorController {
28+
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
29+
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();
2630

2731
let config = config.clone();
2832
let ctx = ctx.clone();
@@ -41,16 +45,39 @@ pub fn start_ordinals_number_processor(
4145
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();
4246

4347
let mut inscription_height_hint = InscriptionHeigthHint::new();
48+
let mut empty_cycles = 0;
49+
50+
loop {
51+
let blocks_to_process = match commands_rx.try_recv() {
52+
Ok(PostProcessorCommand::ProcessBlocks(blocks)) => blocks,
53+
Ok(PostProcessorCommand::Terminate) => break,
54+
Err(e) => match e {
55+
TryRecvError::Empty => {
56+
empty_cycles += 1;
57+
58+
if empty_cycles == 30 {
59+
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
60+
}
61+
sleep(Duration::from_secs(1));
62+
if empty_cycles > 120 {
63+
break;
64+
}
65+
continue;
66+
}
67+
_ => {
68+
break;
69+
}
70+
},
71+
};
4472

45-
while let Ok(raw_blocks) = rx.recv() {
4673
info!(
4774
ctx.expect_logger(),
4875
"Processing {} blocks",
49-
raw_blocks.len()
76+
blocks_to_process.len()
5077
);
5178

5279
let mut blocks = vec![];
53-
for (block, compacted_block) in raw_blocks.into_iter() {
80+
for (block, compacted_block) in blocks_to_process.into_iter() {
5481
insert_entry_in_blocks(
5582
block.block_identifier.index as u32,
5683
&compacted_block,
@@ -118,5 +145,9 @@ pub fn start_ordinals_number_processor(
118145
})
119146
.expect("unable to spawn thread");
120147

121-
(tx, handle)
148+
PostProcessorController {
149+
commands_tx,
150+
events_rx,
151+
thread_handle: handle,
152+
}
122153
}
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod block_ingestion;
12
pub mod inscription_indexing;
23

3-
pub use inscription_indexing::start_ordinals_number_processor;
4+
pub use inscription_indexing::start_inscription_indexing_processor;

components/hord-cli/src/service/mod.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ mod runloops;
44
use crate::cli::fetch_and_standardize_block;
55
use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
66
use crate::core::pipeline::download_and_pipeline_blocks;
7-
use crate::core::pipeline::processors::start_ordinals_number_processor;
7+
use crate::core::pipeline::processors::start_inscription_indexing_processor;
88
use crate::core::protocol::sequencing::{
99
update_hord_db_and_augment_bitcoin_block_v3,
1010
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx,
@@ -93,8 +93,8 @@ impl Service {
9393
// Start predicate processor
9494
let (tx_replayer, rx_replayer) = channel();
9595

96-
let (tx, handle) =
97-
start_ordinals_number_processor(&self.config, &self.ctx, Some(tx_replayer));
96+
let blocks_post_processor =
97+
start_inscription_indexing_processor(&self.config, &self.ctx, Some(tx_replayer));
9898

9999
let mut moved_event_observer_config = event_observer_config.clone();
100100
let moved_ctx = self.ctx.clone();
@@ -141,13 +141,11 @@ impl Service {
141141
start_block,
142142
end_block,
143143
hord_config.first_inscription_height,
144-
Some(tx.clone()),
144+
Some(&blocks_post_processor),
145145
&self.ctx,
146146
)
147147
.await?;
148148
}
149-
150-
let _ = handle.join();
151149
}
152150

153151
// Bitcoin scan operation threadpool

0 commit comments

Comments
 (0)