Skip to content

Commit bcd2a45

Browse files
author
Ludo Galabru
committed
fix: enforce db reconnection in http endpoints
1 parent 3e7b0d0 commit bcd2a45

File tree

2 files changed

+84
-78
lines changed

2 files changed

+84
-78
lines changed

components/chainhook-cli/src/service/http_api.rs

+71-72
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{
22
collections::HashMap,
33
net::{IpAddr, Ipv4Addr},
4-
sync::{mpsc::Sender, Arc, Mutex, RwLock},
4+
sync::{mpsc::Sender, Arc, Mutex},
55
};
66

77
use chainhook_event_observer::{
@@ -20,10 +20,10 @@ use std::error::Error;
2020

2121
use crate::config::PredicatesApiConfig;
2222

23-
use super::{open_readwrite_predicates_db_conn_or_panic, PredicateStatus};
23+
use super::{open_readwrite_predicates_db_conn, PredicateStatus};
2424

2525
pub async fn start_predicate_api_server(
26-
api_config: &PredicatesApiConfig,
26+
api_config: PredicatesApiConfig,
2727
observer_commands_tx: Sender<ObserverCommand>,
2828
ctx: Context,
2929
) -> Result<(), Box<dyn Error>> {
@@ -56,16 +56,13 @@ pub async fn start_predicate_api_server(
5656
];
5757

5858
let background_job_tx_mutex = Arc::new(Mutex::new(observer_commands_tx.clone()));
59-
let redis_con_rw_lock = Arc::new(RwLock::new(open_readwrite_predicates_db_conn_or_panic(
60-
api_config, &ctx,
61-
)));
6259

6360
let ctx_cloned = ctx.clone();
6461

6562
let ignite = rocket::custom(control_config)
6663
.manage(background_job_tx_mutex)
64+
.manage(api_config)
6765
.manage(ctx_cloned)
68-
.manage(redis_con_rw_lock)
6966
.mount("/", routes)
7067
.ignite()
7168
.await?;
@@ -89,44 +86,45 @@ fn handle_ping(ctx: &State<Context>) -> Json<JsonValue> {
8986
#[openapi(tag = "Chainhooks")]
9087
#[get("/v1/chainhooks", format = "application/json")]
9188
fn handle_get_predicates(
92-
predicate_db: &State<Arc<RwLock<Connection>>>,
89+
api_config: &State<PredicatesApiConfig>,
9390
ctx: &State<Context>,
9491
) -> Json<JsonValue> {
9592
ctx.try_log(|logger| slog::info!(logger, "Handling HTTP GET /v1/chainhooks"));
96-
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
97-
let predicates = match get_entries_from_predicates_db(&mut predicates_db_conn, &ctx) {
98-
Ok(predicates) => predicates,
99-
Err(e) => {
100-
ctx.try_log(|logger| slog::warn!(logger, "unable to retrieve predicates: {e}"));
101-
return Json(json!({
102-
"status": 500,
103-
"message": "unable to retrieve predicates",
104-
}));
105-
}
106-
};
107-
108-
let serialized_predicates = predicates
109-
.iter()
110-
.map(|(p, _)| p.into_serialized_json())
111-
.collect::<Vec<_>>();
112-
113-
Json(json!({
114-
"status": 200,
115-
"result": serialized_predicates
116-
}))
117-
} else {
118-
Json(json!({
93+
match open_readwrite_predicates_db_conn(api_config) {
94+
Ok(mut predicates_db_conn) => {
95+
let predicates = match get_entries_from_predicates_db(&mut predicates_db_conn, &ctx) {
96+
Ok(predicates) => predicates,
97+
Err(e) => {
98+
ctx.try_log(|logger| slog::warn!(logger, "unable to retrieve predicates: {e}"));
99+
return Json(json!({
100+
"status": 500,
101+
"message": "unable to retrieve predicates",
102+
}));
103+
}
104+
};
105+
106+
let serialized_predicates = predicates
107+
.iter()
108+
.map(|(p, _)| p.into_serialized_json())
109+
.collect::<Vec<_>>();
110+
111+
Json(json!({
112+
"status": 200,
113+
"result": serialized_predicates
114+
}))
115+
}
116+
Err(e) => Json(json!({
119117
"status": 500,
120-
"message": "too many requests",
121-
}))
118+
"message": e,
119+
})),
122120
}
123121
}
124122

125123
#[openapi(tag = "Chainhooks")]
126124
#[post("/v1/chainhooks", format = "application/json", data = "<predicate>")]
127125
fn handle_create_predicate(
128126
predicate: Json<ChainhookFullSpecification>,
129-
predicate_db: &State<Arc<RwLock<Connection>>>,
127+
api_config: &State<PredicatesApiConfig>,
130128
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
131129
ctx: &State<Context>,
132130
) -> Json<JsonValue> {
@@ -141,7 +139,7 @@ fn handle_create_predicate(
141139

142140
let predicate_uuid = predicate.get_uuid().to_string();
143141

144-
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
142+
if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) {
145143
match get_entry_from_predicates_db(
146144
&ChainhookSpecification::either_stx_or_btc_key(&predicate_uuid),
147145
&mut predicates_db_conn,
@@ -175,7 +173,7 @@ fn handle_create_predicate(
175173
#[get("/v1/chainhooks/<predicate_uuid>", format = "application/json")]
176174
fn handle_get_predicate(
177175
predicate_uuid: String,
178-
predicate_db: &State<Arc<RwLock<Connection>>>,
176+
api_config: &State<PredicatesApiConfig>,
179177
ctx: &State<Context>,
180178
) -> Json<JsonValue> {
181179
ctx.try_log(|logger| {
@@ -186,43 +184,44 @@ fn handle_get_predicate(
186184
)
187185
});
188186

189-
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
190-
let entry = match get_entry_from_predicates_db(
191-
&ChainhookSpecification::either_stx_or_btc_key(&predicate_uuid),
192-
&mut predicates_db_conn,
193-
&ctx,
194-
) {
195-
Ok(Some((ChainhookSpecification::Stacks(spec), status))) => json!({
196-
"chain": "stacks",
197-
"uuid": spec.uuid,
198-
"network": spec.network,
199-
"predicate": spec.predicate,
200-
"status": status,
201-
"enabled": spec.enabled,
202-
}),
203-
Ok(Some((ChainhookSpecification::Bitcoin(spec), status))) => json!({
204-
"chain": "bitcoin",
205-
"uuid": spec.uuid,
206-
"network": spec.network,
207-
"predicate": spec.predicate,
208-
"status": status,
209-
"enabled": spec.enabled,
210-
}),
211-
_ => {
212-
return Json(json!({
213-
"status": 404,
214-
}))
215-
}
216-
};
217-
Json(json!({
218-
"status": 200,
219-
"result": entry
220-
}))
221-
} else {
222-
Json(json!({
187+
match open_readwrite_predicates_db_conn(api_config) {
188+
Ok(mut predicates_db_conn) => {
189+
let entry = match get_entry_from_predicates_db(
190+
&ChainhookSpecification::either_stx_or_btc_key(&predicate_uuid),
191+
&mut predicates_db_conn,
192+
&ctx,
193+
) {
194+
Ok(Some((ChainhookSpecification::Stacks(spec), status))) => json!({
195+
"chain": "stacks",
196+
"uuid": spec.uuid,
197+
"network": spec.network,
198+
"predicate": spec.predicate,
199+
"status": status,
200+
"enabled": spec.enabled,
201+
}),
202+
Ok(Some((ChainhookSpecification::Bitcoin(spec), status))) => json!({
203+
"chain": "bitcoin",
204+
"uuid": spec.uuid,
205+
"network": spec.network,
206+
"predicate": spec.predicate,
207+
"status": status,
208+
"enabled": spec.enabled,
209+
}),
210+
_ => {
211+
return Json(json!({
212+
"status": 404,
213+
}))
214+
}
215+
};
216+
Json(json!({
217+
"status": 200,
218+
"result": entry
219+
}))
220+
}
221+
Err(e) => Json(json!({
223222
"status": 500,
224-
"message": "too many requests",
225-
}))
223+
"message": e,
224+
})),
226225
}
227226
}
228227

components/chainhook-cli/src/service/mod.rs

+13-6
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ impl Service {
196196

197197
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
198198
let future =
199-
start_predicate_api_server(&api_config, moved_observer_command_tx, ctx);
199+
start_predicate_api_server(api_config, moved_observer_command_tx, ctx);
200200
let _ = hiro_system_kit::nestable_block_on(future);
201201
});
202202

@@ -262,7 +262,7 @@ impl Service {
262262
predicates_db_conn.del(chainhook_key);
263263
}
264264
}
265-
ObserverEvent::BitcoinChainEvent((chain_update, report)) => {
265+
ObserverEvent::BitcoinChainEvent((_chain_update, _report)) => {
266266
debug!(self.ctx.expect_logger(), "Bitcoin update not stored");
267267
}
268268
ObserverEvent::StacksChainEvent((chain_event, report)) => {
@@ -403,14 +403,21 @@ pub fn retrieve_predicate_status(
403403
}
404404
}
405405

406+
pub fn open_readwrite_predicates_db_conn(
407+
config: &PredicatesApiConfig,
408+
) -> Result<Connection, String> {
409+
let redis_uri = &config.database_uri;
410+
let client = redis::Client::open(redis_uri.clone()).unwrap();
411+
client
412+
.get_connection()
413+
.map_err(|e| format!("unable to connect to db: {}", e.to_string()))
414+
}
415+
406416
pub fn open_readwrite_predicates_db_conn_or_panic(
407417
config: &PredicatesApiConfig,
408418
ctx: &Context,
409419
) -> Connection {
410-
// Test and initialize a database connection
411-
let redis_uri = &config.database_uri;
412-
let client = redis::Client::open(redis_uri.clone()).unwrap();
413-
let redis_con = match client.get_connection() {
420+
let redis_con = match open_readwrite_predicates_db_conn(config) {
414421
Ok(con) => con,
415422
Err(message) => {
416423
error!(ctx.expect_logger(), "Redis: {}", message.to_string());

0 commit comments

Comments
 (0)