Skip to content

Commit a6b4a5f

Browse files
author
Ludo Galabru
committed
fix: address redis disconnects
1 parent ae913be commit a6b4a5f

File tree

1 file changed

+89
-54
lines changed
  • components/chainhook-cli/src/service

1 file changed

+89
-54
lines changed

components/chainhook-cli/src/service/mod.rs

+89-54
Original file line numberDiff line numberDiff line change
@@ -181,29 +181,20 @@ impl Service {
181181
}
182182
}
183183
// Enable HTTP Chainhook API, if required
184-
let mut predicates_db_conn = match self.config.http_api {
185-
PredicatesApi::On(ref api_config) => {
186-
info!(
187-
self.ctx.expect_logger(),
188-
"Listening for chainhook predicate registrations on port {}",
189-
api_config.http_port
190-
);
191-
let ctx = self.ctx.clone();
192-
let api_config = api_config.clone();
193-
let moved_observer_command_tx = observer_command_tx.clone();
194-
// Test and initialize a database connection
195-
let redis_con = open_readwrite_predicates_db_conn_or_panic(&api_config, &self.ctx);
196-
197-
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
198-
let future =
199-
start_predicate_api_server(api_config, moved_observer_command_tx, ctx);
200-
let _ = hiro_system_kit::nestable_block_on(future);
201-
});
202-
203-
Some(redis_con)
204-
}
205-
PredicatesApi::Off => None,
206-
};
184+
if let PredicatesApi::On(ref api_config) = self.config.http_api {
185+
info!(
186+
self.ctx.expect_logger(),
187+
"Listening for chainhook predicate registrations on port {}", api_config.http_port
188+
);
189+
let ctx = self.ctx.clone();
190+
let api_config = api_config.clone();
191+
let moved_observer_command_tx = observer_command_tx.clone();
192+
// Test and initialize a database connection
193+
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
194+
let future = start_predicate_api_server(api_config, moved_observer_command_tx, ctx);
195+
let _ = hiro_system_kit::nestable_block_on(future);
196+
});
197+
}
207198

208199
let mut stacks_event = 0;
209200
loop {
@@ -218,54 +209,98 @@ impl Service {
218209
break;
219210
}
220211
};
221-
222212
match event {
223-
ObserverEvent::PredicateRegistered(chainhook) => {
213+
ObserverEvent::PredicateRegistered(spec) => {
224214
// If start block specified, use it.
225215
// I no start block specified, depending on the nature the hook, we'd like to retrieve:
226216
// - contract-id
227-
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
228-
let chainhook_key = chainhook.key();
229-
let res: Result<(), redis::RedisError> = predicates_db_conn.hset_multiple(
230-
&chainhook_key,
231-
&[
232-
("specification", json!(chainhook).to_string()),
233-
("status", json!(PredicateStatus::Disabled).to_string()),
234-
],
217+
if let PredicatesApi::On(ref config) = self.config.http_api {
218+
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
219+
{
220+
Ok(con) => con,
221+
Err(e) => {
222+
error!(
223+
self.ctx.expect_logger(),
224+
"unable to register predicate: {}",
225+
e.to_string()
226+
);
227+
continue;
228+
}
229+
};
230+
update_predicate_spec(
231+
&spec.key(),
232+
&spec,
233+
&mut predicates_db_conn,
234+
&self.ctx,
235235
);
236-
if let Err(e) = res {
237-
error!(
238-
self.ctx.expect_logger(),
239-
"unable to store chainhook {chainhook_key}: {}",
240-
e.to_string()
241-
);
236+
update_predicate_status(
237+
&spec.key(),
238+
PredicateStatus::Disabled,
239+
&mut predicates_db_conn,
240+
&self.ctx,
241+
);
242+
}
243+
match spec {
244+
ChainhookSpecification::Stacks(predicate_spec) => {
245+
let _ = stacks_scan_op_tx.send(predicate_spec);
242246
}
243-
match chainhook {
244-
ChainhookSpecification::Stacks(predicate_spec) => {
245-
let _ = stacks_scan_op_tx.send(predicate_spec);
246-
}
247-
ChainhookSpecification::Bitcoin(predicate_spec) => {
248-
let _ = bitcoin_scan_op_tx.send(predicate_spec);
249-
}
247+
ChainhookSpecification::Bitcoin(predicate_spec) => {
248+
let _ = bitcoin_scan_op_tx.send(predicate_spec);
250249
}
251250
}
252251
}
253252
ObserverEvent::PredicateEnabled(spec) => {
254-
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
255-
update_predicate_spec(&spec.key(), &spec, predicates_db_conn, &self.ctx);
253+
if let PredicatesApi::On(ref config) = self.config.http_api {
254+
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
255+
{
256+
Ok(con) => con,
257+
Err(e) => {
258+
error!(
259+
self.ctx.expect_logger(),
260+
"unable to enable predicate: {}",
261+
e.to_string()
262+
);
263+
continue;
264+
}
265+
};
266+
update_predicate_spec(
267+
&spec.key(),
268+
&spec,
269+
&mut predicates_db_conn,
270+
&self.ctx,
271+
);
256272
update_predicate_status(
257273
&spec.key(),
258274
PredicateStatus::InitialScanCompleted,
259-
predicates_db_conn,
275+
&mut predicates_db_conn,
260276
&self.ctx,
261277
);
262278
}
263279
}
264-
ObserverEvent::PredicateDeregistered(chainhook) => {
265-
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
266-
let chainhook_key = chainhook.key();
267-
let _: Result<(), redis::RedisError> =
268-
predicates_db_conn.del(chainhook_key);
280+
ObserverEvent::PredicateDeregistered(spec) => {
281+
if let PredicatesApi::On(ref config) = self.config.http_api {
282+
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
283+
{
284+
Ok(con) => con,
285+
Err(e) => {
286+
error!(
287+
self.ctx.expect_logger(),
288+
"unable to deregister predicate: {}",
289+
e.to_string()
290+
);
291+
continue;
292+
}
293+
};
294+
let predicate_key = spec.key();
295+
let res: Result<(), redis::RedisError> =
296+
predicates_db_conn.del(predicate_key);
297+
if let Err(e) = res {
298+
error!(
299+
self.ctx.expect_logger(),
300+
"unable to delete predicate: {}",
301+
e.to_string()
302+
);
303+
}
269304
}
270305
}
271306
ObserverEvent::BitcoinChainEvent((_chain_update, _report)) => {

0 commit comments

Comments
 (0)