Skip to content

Commit 8c8c5c8

Browse files
author
Ludo Galabru
committed
fix: gap in stacks scanning
1 parent 938c6df commit 8c8c5c8

File tree

3 files changed

+84
-164
lines changed

3 files changed

+84
-164
lines changed

components/chainhook-cli/src/cli/mod.rs

+26-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use crate::block::DigestingCommand;
22
use crate::config::generator::generate_config;
33
use crate::config::Config;
4-
use crate::scan::bitcoin::scan_bitcoin_chain_with_predicate_via_http;
5-
use crate::scan::stacks::scan_stacks_chain_with_predicate;
4+
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_http_using_predicate;
5+
use crate::scan::stacks::scan_stacks_chainstate_via_csv_using_predicate;
66
use crate::service::Service;
77

88
use chainhook_event_observer::bitcoincore_rpc::{Auth, Client, RpcApi};
@@ -535,10 +535,32 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
535535
}
536536
};
537537

538-
scan_bitcoin_chain_with_predicate_via_http(predicate_spec, &config, &ctx).await?;
538+
scan_bitcoin_chainstate_via_http_using_predicate(
539+
predicate_spec,
540+
&config,
541+
&ctx,
542+
)
543+
.await?;
539544
}
540545
ChainhookFullSpecification::Stacks(predicate) => {
541-
scan_stacks_chain_with_predicate(predicate, &mut config, &ctx).await?;
546+
let predicate_spec = match predicate
547+
.into_selected_network_specification(&config.network.stacks_network)
548+
{
549+
Ok(predicate) => predicate,
550+
Err(e) => {
551+
return Err(format!(
552+
"Specification missing for network {:?}: {e}",
553+
config.network.bitcoin_network
554+
));
555+
}
556+
};
557+
558+
scan_stacks_chainstate_via_csv_using_predicate(
559+
predicate_spec,
560+
&mut config,
561+
&ctx,
562+
)
563+
.await?;
542564
}
543565
}
544566
}

components/chainhook-cli/src/scan/stacks.rs

+32-26
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,31 @@ use crate::{
99
config::Config,
1010
};
1111
use chainhook_event_observer::{
12-
chainhooks::stacks::{
13-
handle_stacks_hook_action, StacksChainhookOccurrence, StacksTriggerChainhook,
12+
chainhooks::{
13+
stacks::evaluate_stacks_chainhook_on_blocks,
1414
},
15-
utils::{file_append, send_request, AbstractStacksBlock},
15+
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
16+
utils::Context,
1617
};
1718
use chainhook_event_observer::{
1819
chainhooks::{
19-
stacks::evaluate_stacks_chainhook_on_blocks, types::StacksChainhookFullSpecification,
20+
stacks::{handle_stacks_hook_action, StacksChainhookOccurrence, StacksTriggerChainhook},
21+
types::StacksChainhookSpecification,
2022
},
21-
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
22-
utils::Context,
23+
utils::{file_append, send_request, AbstractStacksBlock},
2324
};
2425
use chainhook_types::BlockIdentifier;
2526

26-
pub async fn scan_stacks_chain_with_predicate(
27-
predicate: StacksChainhookFullSpecification,
27+
pub async fn scan_stacks_chainstate_via_csv_using_predicate(
28+
predicate_spec: StacksChainhookSpecification,
2829
config: &mut Config,
2930
ctx: &Context,
30-
) -> Result<(), String> {
31-
let selected_predicate =
32-
predicate.into_selected_network_specification(&config.network.stacks_network)?;
33-
34-
let start_block = match selected_predicate.start_block {
31+
) -> Result<BlockIdentifier, String> {
32+
let start_block = match predicate_spec.start_block {
3533
Some(start_block) => start_block,
3634
None => {
3735
return Err(
38-
"Chainhook specification must include fields 'start_block' and 'end_block' when using the scan command"
36+
"Chainhook specification must include fields 'start_block' when using the scan command"
3937
.into(),
4038
);
4139
}
@@ -105,7 +103,7 @@ pub async fn scan_stacks_chain_with_predicate(
105103
continue;
106104
}
107105

108-
if let Some(end_block) = selected_predicate.end_block {
106+
if let Some(end_block) = predicate_spec.end_block {
109107
if block_identifier.index > end_block {
110108
break;
111109
}
@@ -137,7 +135,10 @@ pub async fn scan_stacks_chain_with_predicate(
137135
ctx.expect_logger(),
138136
"Starting predicate evaluation on Stacks blocks"
139137
);
140-
for (_block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
138+
let mut last_block_scanned = BlockIdentifier::default();
139+
let mut err_count = 0;
140+
for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
141+
last_block_scanned = block_identifier;
141142
blocks_scanned += 1;
142143
let block_data = match indexer::stacks::standardize_stacks_serialized_block(
143144
&indexer.config,
@@ -154,13 +155,13 @@ pub async fn scan_stacks_chain_with_predicate(
154155

155156
let blocks: Vec<&dyn AbstractStacksBlock> = vec![&block_data];
156157

157-
let hits_per_blocks = evaluate_stacks_chainhook_on_blocks(blocks, &selected_predicate, ctx);
158+
let hits_per_blocks = evaluate_stacks_chainhook_on_blocks(blocks, &predicate_spec, ctx);
158159
if hits_per_blocks.is_empty() {
159160
continue;
160161
}
161162

162163
let trigger = StacksTriggerChainhook {
163-
chainhook: &selected_predicate,
164+
chainhook: &predicate_spec,
164165
apply: hits_per_blocks,
165166
rollback: vec![],
166167
};
@@ -170,24 +171,29 @@ pub async fn scan_stacks_chain_with_predicate(
170171
}
171172
Ok(action) => {
172173
actions_triggered += 1;
173-
match action {
174-
StacksChainhookOccurrence::Http(request) => {
175-
send_request(request, &ctx).await;
176-
}
177-
StacksChainhookOccurrence::File(path, bytes) => {
178-
file_append(path, bytes, &ctx);
179-
}
174+
let res = match action {
175+
StacksChainhookOccurrence::Http(request) => send_request(request, &ctx).await,
176+
StacksChainhookOccurrence::File(path, bytes) => file_append(path, bytes, &ctx),
180177
StacksChainhookOccurrence::Data(_payload) => unreachable!(),
178+
};
179+
if res.is_err() {
180+
err_count += 1;
181+
} else {
182+
err_count = 0;
181183
}
182184
}
183185
}
186+
// We abort after 3 consecutive errors
187+
if err_count >= 3 {
188+
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
189+
}
184190
}
185191
info!(
186192
ctx.expect_logger(),
187193
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
188194
);
189195

190-
Ok(())
196+
Ok(last_block_scanned)
191197
}
192198

193199
async fn download_dataset_if_required(config: &mut Config, ctx: &Context) -> bool {

components/chainhook-cli/src/service/mod.rs

+26-134
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,21 @@
11
use crate::config::Config;
2-
use crate::scan::bitcoin::scan_bitcoin_chain_with_predicate_via_http;
3-
4-
5-
use chainhook_event_observer::chainhooks::types::{
6-
ChainhookConfig, ChainhookFullSpecification,
7-
};
2+
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_http_using_predicate;
3+
use crate::scan::stacks::scan_stacks_chainstate_via_csv_using_predicate;
84

5+
use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};
96

107
use chainhook_event_observer::observer::{start_event_observer, ApiKey, ObserverEvent};
11-
use chainhook_event_observer::utils::{Context};
8+
use chainhook_event_observer::utils::Context;
129
use chainhook_event_observer::{
13-
chainhooks::stacks::{
14-
evaluate_stacks_predicate_on_transaction, handle_stacks_hook_action,
15-
StacksChainhookOccurrence, StacksTriggerChainhook,
16-
},
1710
chainhooks::types::ChainhookSpecification,
1811
};
1912
use chainhook_types::{
20-
BitcoinBlockSignaling, BlockIdentifier, StacksBlockData, StacksBlockMetadata, StacksChainEvent,
21-
StacksTransactionData,
13+
BitcoinBlockSignaling, StacksBlockData, StacksChainEvent,
2214
};
2315
use redis::{Commands, Connection};
2416

25-
use std::collections::{HashMap};
26-
use std::sync::mpsc::channel;
2717

18+
use std::sync::mpsc::channel;
2819

2920
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
3021
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
@@ -191,129 +182,30 @@ impl Service {
191182
);
192183
}
193184
match chainhook {
194-
ChainhookSpecification::Stacks(stacks_hook) => {
195-
// Retrieve highest block height stored
196-
let tip_height: u64 = redis_con.get(&format!("stx:tip")).unwrap_or(1);
197-
198-
let start_block = stacks_hook.start_block.unwrap_or(1); // TODO(lgalabru): handle STX hooks and genesis block :s
199-
let end_block = stacks_hook.end_block.unwrap_or(tip_height); // TODO(lgalabru): handle STX hooks and genesis block :s
200-
185+
ChainhookSpecification::Stacks(predicate_spec) => {
186+
let end_block = match scan_stacks_chainstate_via_csv_using_predicate(
187+
predicate_spec,
188+
&mut self.config,
189+
&self.ctx,
190+
)
191+
.await
192+
{
193+
Ok(end_block) => end_block,
194+
Err(e) => {
195+
error!(
196+
self.ctx.expect_logger(),
197+
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
198+
);
199+
continue;
200+
}
201+
};
201202
info!(
202203
self.ctx.expect_logger(),
203-
"Processing Stacks chainhook {}, will scan blocks [{}; {}]",
204-
stacks_hook.uuid,
205-
start_block,
206-
end_block
204+
"Stacks chainstate scan completed up to block: {}", end_block.index
207205
);
208-
let mut total_hits = 0;
209-
for cursor in start_block..=end_block {
210-
debug!(
211-
self.ctx.expect_logger(),
212-
"Evaluating predicate #{} on block #{}",
213-
stacks_hook.uuid,
214-
cursor
215-
);
216-
let (
217-
block_identifier,
218-
parent_block_identifier,
219-
timestamp,
220-
transactions,
221-
metadata,
222-
) = {
223-
let payload: Vec<String> = redis_con
224-
.hget(
225-
&format!("stx:{}", cursor),
226-
&[
227-
"block_identifier",
228-
"parent_block_identifier",
229-
"timestamp",
230-
"transactions",
231-
"metadata",
232-
],
233-
)
234-
.expect("unable to retrieve tip height");
235-
if payload.len() != 5 {
236-
warn!(self.ctx.expect_logger(), "Chain still being processed, please retry in a few minutes");
237-
continue;
238-
}
239-
(
240-
serde_json::from_str::<BlockIdentifier>(&payload[0])
241-
.unwrap(),
242-
serde_json::from_str::<BlockIdentifier>(&payload[1])
243-
.unwrap(),
244-
serde_json::from_str::<i64>(&payload[2]).unwrap(),
245-
serde_json::from_str::<Vec<StacksTransactionData>>(
246-
&payload[3],
247-
)
248-
.unwrap(),
249-
serde_json::from_str::<StacksBlockMetadata>(&payload[4])
250-
.unwrap(),
251-
)
252-
};
253-
let mut hits = vec![];
254-
for tx in transactions.iter() {
255-
if evaluate_stacks_predicate_on_transaction(
256-
&tx,
257-
&stacks_hook,
258-
&self.ctx,
259-
) {
260-
debug!(
261-
self.ctx.expect_logger(),
262-
"Action #{} triggered by transaction {} (block #{})",
263-
stacks_hook.uuid,
264-
tx.transaction_identifier.hash,
265-
cursor
266-
);
267-
hits.push(tx);
268-
total_hits += 1;
269-
}
270-
}
271-
272-
if hits.len() > 0 {
273-
let block = StacksBlockData {
274-
block_identifier,
275-
parent_block_identifier,
276-
timestamp,
277-
transactions: vec![],
278-
metadata,
279-
};
280-
let trigger = StacksTriggerChainhook {
281-
chainhook: &stacks_hook,
282-
apply: vec![(hits, &block)],
283-
rollback: vec![],
284-
};
285-
286-
let proofs = HashMap::new();
287-
match handle_stacks_hook_action(trigger, &proofs, &self.ctx) {
288-
Err(e) => {
289-
info!(
290-
self.ctx.expect_logger(),
291-
"unable to handle action {}", e
292-
);
293-
}
294-
Ok(StacksChainhookOccurrence::Http(request)) => {
295-
if let Err(e) =
296-
hiro_system_kit::nestable_block_on(request.send())
297-
{
298-
error!(
299-
self.ctx.expect_logger(),
300-
"unable to perform action {}", e
301-
);
302-
}
303-
}
304-
Ok(_) => {
305-
error!(
306-
self.ctx.expect_logger(),
307-
"action not supported"
308-
);
309-
}
310-
}
311-
}
312-
}
313-
info!(self.ctx.expect_logger(), "Stacks chainhook {} scan completed: action triggered by {} transactions", stacks_hook.uuid, total_hits);
314206
}
315207
ChainhookSpecification::Bitcoin(predicate_spec) => {
316-
match scan_bitcoin_chain_with_predicate_via_http(
208+
match scan_bitcoin_chainstate_via_http_using_predicate(
317209
predicate_spec,
318210
&self.config,
319211
&self.ctx,
@@ -322,7 +214,7 @@ impl Service {
322214
{
323215
Ok(_) => {}
324216
Err(e) => {
325-
info!(
217+
error!(
326218
self.ctx.expect_logger(),
327219
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
328220
);

0 commit comments

Comments
 (0)