Skip to content

Commit f6d050f

Browse files
author
Ludo Galabru
committed
feat: handle stacks unconfirmed state scans
1 parent 158633c commit f6d050f

File tree

4 files changed

+223
-95
lines changed

4 files changed

+223
-95
lines changed

components/chainhook-cli/src/scan/stacks.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,13 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
147147
let mut last_block_scanned = BlockIdentifier::default();
148148
let mut err_count = 0;
149149
for cursor in start_block..=end_block {
150-
let block_data = match get_stacks_block_at_block_height(cursor, 3, stacks_db_conn) {
150+
let block_data = match get_stacks_block_at_block_height(cursor, true, 3, stacks_db_conn) {
151151
Ok(Some(block)) => block,
152-
Ok(None) => unimplemented!(),
152+
Ok(None) => match get_stacks_block_at_block_height(cursor, false, 3, stacks_db_conn) {
153+
Ok(Some(block)) => block,
154+
Ok(None) => unimplemented!(),
155+
Err(_) => unimplemented!(),
156+
},
153157
Err(_) => unimplemented!(),
154158
};
155159
last_block_scanned = block_data.block_identifier.clone();

components/chainhook-cli/src/service/mod.rs

+31-86
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
mod http_api;
2+
mod runloops;
23

34
use crate::config::{Config, PredicatesApi};
4-
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_http_using_predicate;
5+
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
56
use crate::scan::stacks::{
67
consolidate_local_stacks_chainstate_using_csv,
78
scan_stacks_chainstate_via_rocksdb_using_predicate,
89
};
910
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
11+
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
1012
use crate::storage::{
11-
insert_entries_in_stacks_blocks, open_readonly_stacks_db_conn, open_readwrite_stacks_db_conn,
13+
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks,
14+
insert_unconfirmed_entry_in_stacks_blocks, open_readonly_stacks_db_conn,
15+
open_readwrite_stacks_db_conn,
1216
};
1317

1418
use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};
@@ -135,102 +139,33 @@ impl Service {
135139

136140
// Stacks scan operation threadpool
137141
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
138-
let stacks_scan_pool =
139-
ThreadPool::new(self.config.limits.max_number_of_concurrent_stacks_scans);
140142
let ctx = self.ctx.clone();
141143
let config = self.config.clone();
142144
let observer_command_tx_moved = observer_command_tx.clone();
143145
let _ = hiro_system_kit::thread_named("Stacks scan runloop")
144146
.spawn(move || {
145-
while let Ok(mut predicate_spec) = stacks_scan_op_rx.recv() {
146-
let moved_ctx = ctx.clone();
147-
let moved_config = config.clone();
148-
let observer_command_tx = observer_command_tx_moved.clone();
149-
stacks_scan_pool.execute(move || {
150-
let stacks_db_conn = match open_readonly_stacks_db_conn(
151-
&moved_config.expected_cache_path(),
152-
&moved_ctx,
153-
) {
154-
Ok(db_conn) => db_conn,
155-
Err(e) => {
156-
error!(
157-
moved_ctx.expect_logger(),
158-
"unable to store stacks block: {}",
159-
e.to_string()
160-
);
161-
unimplemented!()
162-
}
163-
};
164-
165-
let op = scan_stacks_chainstate_via_rocksdb_using_predicate(
166-
&predicate_spec,
167-
&stacks_db_conn,
168-
&moved_config,
169-
&moved_ctx,
170-
);
171-
let last_block_scanned = match hiro_system_kit::nestable_block_on(op) {
172-
Ok(last_block_scanned) => last_block_scanned,
173-
Err(e) => {
174-
error!(
175-
moved_ctx.expect_logger(),
176-
"Unable to evaluate predicate on Stacks chainstate: {e}",
177-
);
178-
return;
179-
}
180-
};
181-
info!(
182-
moved_ctx.expect_logger(),
183-
"Stacks chainstate scan completed up to block: {}",
184-
last_block_scanned.index
185-
);
186-
predicate_spec.end_block = Some(last_block_scanned.index);
187-
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
188-
ChainhookSpecification::Stacks(predicate_spec),
189-
));
190-
});
191-
}
192-
let res = stacks_scan_pool.join();
193-
res
147+
start_stacks_scan_runloop(
148+
&config,
149+
stacks_scan_op_rx,
150+
observer_command_tx_moved,
151+
&ctx,
152+
);
194153
})
195154
.expect("unable to spawn thread");
196155

197156
// Bitcoin scan operation threadpool
198157
let (bitcoin_scan_op_tx, bitcoin_scan_op_rx) = crossbeam_channel::unbounded();
199-
let bitcoin_scan_pool =
200-
ThreadPool::new(self.config.limits.max_number_of_concurrent_bitcoin_scans);
201158
let ctx = self.ctx.clone();
202159
let config = self.config.clone();
203-
let moved_observer_command_tx = observer_command_tx.clone();
160+
let observer_command_tx_moved = observer_command_tx.clone();
204161
let _ = hiro_system_kit::thread_named("Bitcoin scan runloop")
205162
.spawn(move || {
206-
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
207-
let moved_ctx = ctx.clone();
208-
let moved_config = config.clone();
209-
let observer_command_tx = moved_observer_command_tx.clone();
210-
bitcoin_scan_pool.execute(move || {
211-
let op = scan_bitcoin_chainstate_via_http_using_predicate(
212-
&predicate_spec,
213-
&moved_config,
214-
&moved_ctx,
215-
);
216-
217-
match hiro_system_kit::nestable_block_on(op) {
218-
Ok(_) => {}
219-
Err(e) => {
220-
error!(
221-
moved_ctx.expect_logger(),
222-
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
223-
);
224-
return;
225-
}
226-
};
227-
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
228-
ChainhookSpecification::Bitcoin(predicate_spec),
229-
));
230-
});
231-
}
232-
let res = bitcoin_scan_pool.join();
233-
res
163+
start_bitcoin_scan_runloop(
164+
&config,
165+
bitcoin_scan_op_rx,
166+
observer_command_tx_moved,
167+
&ctx,
168+
);
234169
})
235170
.expect("unable to spawn thread");
236171

@@ -358,18 +293,28 @@ impl Service {
358293
match &chain_event {
359294
StacksChainEvent::ChainUpdatedWithBlocks(data) => {
360295
stacks_event += 1;
361-
insert_entries_in_stacks_blocks(
296+
confirm_entries_in_stacks_blocks(
362297
&data.confirmed_blocks,
363298
&stacks_db_conn_rw,
364299
&self.ctx,
365300
);
301+
draft_entries_in_stacks_blocks(
302+
&data.new_blocks,
303+
&stacks_db_conn_rw,
304+
&self.ctx,
305+
)
366306
}
367307
StacksChainEvent::ChainUpdatedWithReorg(data) => {
368-
insert_entries_in_stacks_blocks(
308+
confirm_entries_in_stacks_blocks(
369309
&data.confirmed_blocks,
370310
&stacks_db_conn_rw,
371311
&self.ctx,
372312
);
313+
draft_entries_in_stacks_blocks(
314+
&data.blocks_to_apply,
315+
&stacks_db_conn_rw,
316+
&self.ctx,
317+
)
373318
}
374319
StacksChainEvent::ChainUpdatedWithMicroblocks(_)
375320
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
use std::sync::mpsc::Sender;
2+
3+
use chainhook_event_observer::{
4+
chainhooks::types::{
5+
BitcoinChainhookSpecification, ChainhookSpecification, StacksChainhookSpecification,
6+
},
7+
observer::ObserverCommand,
8+
utils::Context,
9+
};
10+
use threadpool::ThreadPool;
11+
12+
use crate::{
13+
config::{Config, PredicatesApiConfig},
14+
scan::{
15+
bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate,
16+
stacks::scan_stacks_chainstate_via_rocksdb_using_predicate,
17+
},
18+
storage::open_readonly_stacks_db_conn,
19+
};
20+
21+
pub fn start_stacks_scan_runloop(
22+
config: &Config,
23+
stacks_scan_op_rx: crossbeam_channel::Receiver<StacksChainhookSpecification>,
24+
observer_command_tx: Sender<ObserverCommand>,
25+
ctx: &Context,
26+
) {
27+
let stacks_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_stacks_scans);
28+
while let Ok(mut predicate_spec) = stacks_scan_op_rx.recv() {
29+
let moved_ctx = ctx.clone();
30+
let moved_config = config.clone();
31+
let observer_command_tx = observer_command_tx.clone();
32+
stacks_scan_pool.execute(move || {
33+
let stacks_db_conn =
34+
match open_readonly_stacks_db_conn(&moved_config.expected_cache_path(), &moved_ctx)
35+
{
36+
Ok(db_conn) => db_conn,
37+
Err(e) => {
38+
error!(
39+
moved_ctx.expect_logger(),
40+
"unable to store stacks block: {}",
41+
e.to_string()
42+
);
43+
unimplemented!()
44+
}
45+
};
46+
47+
let op = scan_stacks_chainstate_via_rocksdb_using_predicate(
48+
&predicate_spec,
49+
&stacks_db_conn,
50+
&moved_config,
51+
&moved_ctx,
52+
);
53+
let last_block_scanned = match hiro_system_kit::nestable_block_on(op) {
54+
Ok(last_block_scanned) => last_block_scanned,
55+
Err(e) => {
56+
error!(
57+
moved_ctx.expect_logger(),
58+
"Unable to evaluate predicate on Stacks chainstate: {e}",
59+
);
60+
return;
61+
}
62+
};
63+
info!(
64+
moved_ctx.expect_logger(),
65+
"Stacks chainstate scan completed up to block: {}", last_block_scanned.index
66+
);
67+
predicate_spec.end_block = Some(last_block_scanned.index);
68+
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
69+
ChainhookSpecification::Stacks(predicate_spec),
70+
));
71+
});
72+
}
73+
let res = stacks_scan_pool.join();
74+
res
75+
}
76+
77+
pub fn start_bitcoin_scan_runloop(
78+
config: &Config,
79+
bitcoin_scan_op_rx: crossbeam_channel::Receiver<BitcoinChainhookSpecification>,
80+
observer_command_tx: Sender<ObserverCommand>,
81+
ctx: &Context,
82+
) {
83+
let bitcoin_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_bitcoin_scans);
84+
85+
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
86+
let moved_ctx = ctx.clone();
87+
let moved_config = config.clone();
88+
let observer_command_tx = observer_command_tx.clone();
89+
bitcoin_scan_pool.execute(move || {
90+
let op = scan_bitcoin_chainstate_via_rpc_using_predicate(
91+
&predicate_spec,
92+
&moved_config,
93+
&moved_ctx,
94+
);
95+
96+
match hiro_system_kit::nestable_block_on(op) {
97+
Ok(_) => {}
98+
Err(e) => {
99+
error!(
100+
moved_ctx.expect_logger(),
101+
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
102+
);
103+
return;
104+
}
105+
};
106+
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
107+
ChainhookSpecification::Bitcoin(predicate_spec),
108+
));
109+
});
110+
}
111+
let res = bitcoin_scan_pool.join();
112+
}

0 commit comments

Comments
 (0)