Skip to content

Commit 45b9abd

Browse files
author
Ludo Galabru
committed
feat: use thread pools for scans
1 parent daf5547 commit 45b9abd

File tree

3 files changed

+77
-43
lines changed

3 files changed

+77
-43
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

components/chainhook-cli/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ ansi_term = "0.12.1"
3737
atty = "0.2.14"
3838
crossbeam-channel = "0.5.6"
3939
uuid = { version = "1.3.0", features = ["v4", "fast-rng"] }
40+
threadpool = "1.8.1"
4041

4142
[dev-dependencies]
4243
criterion = "0.3"

components/chainhook-cli/src/service/mod.rs

+75-43
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@ use chainhook_event_observer::observer::{start_event_observer, ApiKey, ObserverE
99
use chainhook_event_observer::utils::Context;
1010
use chainhook_types::{BitcoinBlockSignaling, StacksBlockData, StacksChainEvent};
1111
use redis::{Commands, Connection};
12+
use threadpool::ThreadPool;
1213

1314
use std::sync::mpsc::channel;
1415

1516
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
1617
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
18+
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 12;
19+
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 12;
1720

1821
pub struct Service {
1922
config: Config,
@@ -110,14 +113,6 @@ impl Service {
110113
);
111114
}
112115

113-
// let ordinal_index = match initialize_ordinal_index(&event_observer_config, None, &self.ctx)
114-
// {
115-
// Ok(index) => index,
116-
// Err(e) => {
117-
// panic!()
118-
// }
119-
// };
120-
121116
let context_cloned = self.ctx.clone();
122117
let event_observer_config_moved = event_observer_config.clone();
123118
let _ = std::thread::spawn(move || {
@@ -131,6 +126,76 @@ impl Service {
131126
let _ = hiro_system_kit::nestable_block_on(future);
132127
});
133128

129+
// Stacks scan operation threadpool
130+
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
131+
let stacks_scan_pool = ThreadPool::new(STACKS_SCAN_THREAD_POOL_SIZE);
132+
let ctx = self.ctx.clone();
133+
let config = self.config.clone();
134+
let _ = hiro_system_kit::thread_named("Stacks scan runloop")
135+
.spawn(move || {
136+
while let Ok(predicate_spec) = stacks_scan_op_rx.recv() {
137+
let moved_ctx = ctx.clone();
138+
let mut moved_config = config.clone();
139+
stacks_scan_pool.execute(move || {
140+
let op = scan_stacks_chainstate_via_csv_using_predicate(
141+
predicate_spec,
142+
&mut moved_config,
143+
&moved_ctx,
144+
);
145+
let end_block = match hiro_system_kit::nestable_block_on(op) {
146+
Ok(end_block) => end_block,
147+
Err(e) => {
148+
error!(
149+
moved_ctx.expect_logger(),
150+
"Unable to evaluate predicate on Stacks chainstate: {e}",
151+
);
152+
return;
153+
}
154+
};
155+
info!(
156+
moved_ctx.expect_logger(),
157+
"Stacks chainstate scan completed up to block: {}", end_block.index
158+
);
159+
});
160+
}
161+
let res = stacks_scan_pool.join();
162+
res
163+
})
164+
.expect("unable to spawn thread");
165+
166+
// Bitcoin scan operation threadpool
167+
let (bitcoin_scan_op_tx, bitcoin_scan_op_rx) = crossbeam_channel::unbounded();
168+
let bitcoin_scan_pool = ThreadPool::new(BITCOIN_SCAN_THREAD_POOL_SIZE);
169+
let ctx = self.ctx.clone();
170+
let config = self.config.clone();
171+
let _ = hiro_system_kit::thread_named("Bitcoin scan runloop")
172+
.spawn(move || {
173+
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
174+
let moved_ctx = ctx.clone();
175+
let moved_config = config.clone();
176+
bitcoin_scan_pool.execute(move || {
177+
let op = scan_bitcoin_chainstate_via_http_using_predicate(
178+
predicate_spec,
179+
&moved_config,
180+
&moved_ctx,
181+
);
182+
183+
match hiro_system_kit::nestable_block_on(op) {
184+
Ok(_) => {}
185+
Err(e) => {
186+
error!(
187+
moved_ctx.expect_logger(),
188+
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
189+
);
190+
}
191+
};
192+
});
193+
}
194+
let res = bitcoin_scan_pool.join();
195+
res
196+
})
197+
.expect("unable to spawn thread");
198+
134199
loop {
135200
let event = match observer_event_rx.recv() {
136201
Ok(cmd) => cmd,
@@ -178,43 +243,10 @@ impl Service {
178243
}
179244
match chainhook {
180245
ChainhookSpecification::Stacks(predicate_spec) => {
181-
let end_block = match scan_stacks_chainstate_via_csv_using_predicate(
182-
predicate_spec,
183-
&mut self.config,
184-
&self.ctx,
185-
)
186-
.await
187-
{
188-
Ok(end_block) => end_block,
189-
Err(e) => {
190-
error!(
191-
self.ctx.expect_logger(),
192-
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
193-
);
194-
continue;
195-
}
196-
};
197-
info!(
198-
self.ctx.expect_logger(),
199-
"Stacks chainstate scan completed up to block: {}", end_block.index
200-
);
246+
let _ = stacks_scan_op_tx.send(predicate_spec);
201247
}
202248
ChainhookSpecification::Bitcoin(predicate_spec) => {
203-
match scan_bitcoin_chainstate_via_http_using_predicate(
204-
predicate_spec,
205-
&self.config,
206-
&self.ctx,
207-
)
208-
.await
209-
{
210-
Ok(_) => {}
211-
Err(e) => {
212-
error!(
213-
self.ctx.expect_logger(),
214-
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
215-
);
216-
}
217-
};
249+
let _ = bitcoin_scan_op_tx.send(predicate_spec);
218250
}
219251
}
220252
}

0 commit comments

Comments
 (0)