Skip to content

Commit 396c8db

Browse files
authored
Improve stability (#30)
* remove unwrap, add config file TU, update migration with time zone * add hypertables * cargo fmt
1 parent 9961a24 commit 396c8db

File tree

17 files changed

+125
-73
lines changed

17 files changed

+125
-73
lines changed

pragma-entities/migrations/2023-10-11-223513_create_entries/up.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ CREATE TABLE entries (
55
id uuid DEFAULT uuid_generate_v4(),
66
pair_id VARCHAR NOT NULL,
77
publisher TEXT NOT NULL,
8-
timestamp TIMESTAMP NOT NULL,
8+
timestamp TIMESTAMPTZ NOT NULL,
99
price NUMERIC NOT NULL,
10-
PRIMARY KEY (id)
10+
PRIMARY KEY (id, timestamp)
1111
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- This file should undo anything in `up.sql`
2+
SELECT detach_table('entries');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- Your SQL goes here
2+
SELECT create_hypertable('entries', 'timestamp');

pragma-entities/src/dto/entry.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl From<crate::Entry> for Entry {
2828
publisher: entry.publisher,
2929
source: entry.source,
3030
timestamp: entry.timestamp.timestamp_millis() as u64,
31-
price: entry.price.to_u128().unwrap(),
31+
price: entry.price.to_u128().unwrap_or(0), // change default value ?
3232
}
3333
}
3434
}

pragma-entities/src/error.rs

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use utoipa::ToSchema;
88
pub enum InfraError {
99
InternalServerError,
1010
NotFound,
11+
InvalidTimeStamp,
1112
}
1213

1314
#[derive(Debug, Error)]
@@ -30,6 +31,7 @@ impl fmt::Display for InfraError {
3031
match self {
3132
InfraError::NotFound => write!(f, "Not found"),
3233
InfraError::InternalServerError => write!(f, "Internal server error"),
34+
InfraError::InvalidTimeStamp => write!(f, "Invalid timestamp"),
3335
}
3436
}
3537
}

pragma-entities/src/models/entry_error.rs

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ pub enum EntryError {
2525
InvalidSignature(EcdsaVerifyError),
2626
#[error("unauthorized request")]
2727
Unauthorized,
28+
#[error("invalid timestamp")]
29+
InvalidTimestamp,
2830
#[error("publisher error: {0}")]
2931
PublisherError(#[from] PublisherError),
3032
#[error("pair id invalid: {0}")]

pragma-entities/src/schema.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ diesel::table! {
1616
id -> Uuid,
1717
pair_id -> Varchar,
1818
publisher -> Text,
19-
timestamp -> Timestamp,
19+
timestamp -> Timestamptz,
2020
price -> Numeric,
2121
source -> Varchar,
2222
}

pragma-ingestor/src/config.rs

+47
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,50 @@ impl Ingestor {
2424
pub fn load_configuration() -> Ingestor {
2525
Ingestor::from_env().expect("cannot load configuration env")
2626
}
27+
28+
#[cfg(test)]
29+
mod tests {
30+
use super::*;
31+
use std::env;
32+
33+
#[test]
34+
fn test_ingestor_init() {
35+
let brokers = vec!["localhost:9092".to_string()];
36+
let ingestor = Ingestor {
37+
brokers: brokers.clone(),
38+
topic: "test_topic".to_string(),
39+
group_id: "test_group".to_string(),
40+
};
41+
42+
assert_eq!(ingestor.brokers, brokers);
43+
assert_eq!(ingestor.topic, "test_topic");
44+
assert_eq!(ingestor.group_id, "test_group");
45+
}
46+
47+
#[test]
48+
fn test_load_from_env() {
49+
env::set_var("BROKERS", "localhost:9092");
50+
env::set_var("TOPIC", "test_topic");
51+
env::set_var("GROUP_ID", "test_group");
52+
53+
let ingestor = Ingestor::from_env().unwrap();
54+
55+
assert_eq!(ingestor.brokers, vec!["localhost:9092".to_string()]);
56+
assert_eq!(ingestor.topic, "test_topic");
57+
assert_eq!(ingestor.group_id, "test_group");
58+
59+
env::remove_var("BROKERS");
60+
env::remove_var("TOPIC");
61+
env::remove_var("GROUP_ID");
62+
}
63+
64+
#[test]
65+
fn test_env_error_handling() {
66+
env::remove_var("BROKERS");
67+
env::remove_var("TOPIC");
68+
env::remove_var("GROUP_ID");
69+
70+
let result = Ingestor::from_env();
71+
assert!(result.is_err());
72+
}
73+
}

pragma-node/src/config.rs

+26
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,29 @@ async fn init_config() -> Config {
6868
pub async fn config() -> &'static Config {
6969
CONFIG.get_or_init(init_config).await
7070
}
71+
72+
#[cfg(test)]
73+
mod tests {
74+
use super::*;
75+
76+
#[tokio::test]
77+
async fn test_default_server_config() {
78+
let server_config = ServerConfig::default();
79+
assert_eq!(server_config.host, "0.0.0.0");
80+
assert_eq!(server_config.port, 3000);
81+
}
82+
83+
#[tokio::test]
84+
async fn test_default_kafka_config() {
85+
let kafka_config = KafkaConfig::default();
86+
assert_eq!(kafka_config.topic, "pragma-data");
87+
}
88+
89+
#[tokio::test]
90+
async fn test_config_values() {
91+
let config = init_config().await;
92+
assert_eq!(config.server_host(), "0.0.0.0");
93+
assert_eq!(config.server_port(), 3000);
94+
assert_eq!(config.kafka_topic(), "pragma-data");
95+
}
96+
}

pragma-node/src/handlers/entries/create_entry.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,22 @@ pub async fn create_entries(
143143
let new_entries_db = new_entries
144144
.entries
145145
.iter()
146-
.map(|entry| NewEntry {
147-
pair_id: entry.pair_id.clone(),
148-
publisher: entry.base.publisher.clone(),
149-
source: entry.base.source.clone(),
150-
timestamp: NaiveDateTime::from_timestamp_opt(entry.base.timestamp as i64, 0).unwrap(), // TODO: remove unwrap
151-
price: entry.price.into(),
146+
.map(|entry| {
147+
let timestamp = match NaiveDateTime::from_timestamp_opt(entry.base.timestamp as i64, 0)
148+
{
149+
Some(timestamp) => timestamp,
150+
None => return Err(EntryError::InvalidTimestamp),
151+
};
152+
153+
Ok(NewEntry {
154+
pair_id: entry.pair_id.clone(),
155+
publisher: entry.base.publisher.clone(),
156+
source: entry.base.source.clone(),
157+
timestamp,
158+
price: entry.price.into(),
159+
})
152160
})
153-
.collect::<Vec<NewEntry>>();
161+
.collect::<Result<Vec<NewEntry>, EntryError>>()?;
154162

155163
let data =
156164
serde_json::to_vec(&new_entries_db).map_err(|e| EntryError::PublishData(e.to_string()))?;

pragma-node/src/handlers/entries/get_entry.rs

+11-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
use axum::extract::State;
22
use axum::Json;
3-
use bigdecimal::num_bigint::ToBigInt;
3+
use bigdecimal::num_bigint::{BigInt, ToBigInt};
44

55
use crate::handlers::entries::GetEntryResponse;
6-
use crate::infra::errors::InfraError;
76
use crate::infra::repositories::entry_repository::{self, MedianEntry};
87
use crate::utils::PathExtractor;
98
use crate::AppState;
10-
use pragma_entities::EntryError;
9+
use pragma_entities::{error::InfraError, EntryError};
1110

1211
use super::utils::{compute_median_price_and_time, currency_pair_to_pair_id};
1312

@@ -47,6 +46,7 @@ pub async fn get_entry(
4746
.map_err(|db_error| match db_error {
4847
InfraError::InternalServerError => EntryError::InternalServerError,
4948
InfraError::NotFound => EntryError::NotFound(pair_id.clone()),
49+
InfraError::InvalidTimeStamp => EntryError::InvalidTimestamp,
5050
})?;
5151

5252
// Error if no entries found
@@ -59,6 +59,7 @@ pub async fn get_entry(
5959
.map_err(|db_error| match db_error {
6060
InfraError::InternalServerError => EntryError::InternalServerError,
6161
InfraError::NotFound => EntryError::NotFound(pair_id.clone()),
62+
InfraError::InvalidTimeStamp => EntryError::InvalidTimestamp,
6263
})?;
6364

6465
Ok(Json(adapt_entry_to_entry_response(
@@ -79,7 +80,13 @@ fn adapt_entry_to_entry_response(
7980
pair_id,
8081
timestamp: timestamp.timestamp_millis() as u64,
8182
num_sources_aggregated: entries.len(),
82-
price: format!("0x{}", price.to_bigint().unwrap().to_str_radix(16)),
83+
price: format!(
84+
"0x{}",
85+
price
86+
.to_bigint()
87+
.unwrap_or(BigInt::default())
88+
.to_str_radix(16)
89+
),
8390
decimals,
8491
}
8592
}

pragma-node/src/handlers/entries/get_volatility.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ use serde::Deserialize;
44
use utoipa::IntoParams;
55

66
use crate::handlers::entries::GetVolatilityResponse;
7-
use crate::infra::errors::InfraError;
87
use crate::infra::repositories::entry_repository::{self, MedianEntry};
98
use crate::utils::PathExtractor;
109
use crate::AppState;
11-
use pragma_entities::{EntryError, VolatilityError};
10+
use pragma_entities::{error::InfraError, EntryError, VolatilityError};
1211

1312
use super::utils::{compute_volatility, currency_pair_to_pair_id};
1413

@@ -59,6 +58,7 @@ pub async fn get_volatility(
5958
.map_err(|db_error| match db_error {
6059
InfraError::InternalServerError => EntryError::InternalServerError,
6160
InfraError::NotFound => EntryError::NotFound(pair_id.clone()),
61+
InfraError::InvalidTimeStamp => EntryError::InvalidTimestamp,
6262
})?;
6363

6464
if entries.is_empty() {
@@ -70,6 +70,7 @@ pub async fn get_volatility(
7070
.map_err(|db_error| match db_error {
7171
InfraError::InternalServerError => EntryError::InternalServerError,
7272
InfraError::NotFound => EntryError::NotFound(pair_id.clone()),
73+
InfraError::InvalidTimeStamp => EntryError::InvalidTimestamp,
7374
})?;
7475

7576
Ok(Json(adapt_entry_to_entry_response(

pragma-node/src/handlers/entries/utils.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub(crate) fn compute_median_price_and_time(
4444
pub(crate) fn compute_volatility(entries: &Vec<MedianEntry>) -> f64 {
4545
let mut values = Vec::new();
4646
for i in 1..entries.len() {
47-
if entries[i].median_price.to_f64().unwrap() > 0.0
47+
if entries[i].median_price.to_f64().unwrap_or(0.0) > 0.0
4848
&& entries[i - 1].median_price.to_f64().unwrap() > 0.0
4949
&& (entries[i].time - entries[i - 1].time).num_seconds() > 0
5050
{

pragma-node/src/infra/errors.rs

-49
This file was deleted.

pragma-node/src/infra/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
1-
pub mod errors;
21
pub mod kafka;
32
pub mod repositories;

pragma-node/src/infra/repositories/entry_repository.rs

+9-4
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ use diesel::prelude::QueryableByName;
44
use diesel::{ExpressionMethods, QueryDsl, Queryable, RunQueryDsl};
55
use serde::{Deserialize, Serialize};
66

7-
use crate::infra::errors::{adapt_infra_error, InfraError};
87
use pragma_entities::dto;
9-
use pragma_entities::{schema::currencies, Entry, NewEntry};
8+
use pragma_entities::{
9+
error::{adapt_infra_error, InfraError},
10+
schema::currencies,
11+
Entry, NewEntry,
12+
};
1013

1114
#[derive(Deserialize)]
1215
#[allow(unused)]
@@ -127,8 +130,10 @@ pub async fn get_entries_between(
127130
end_timestamp: u64,
128131
) -> Result<Vec<MedianEntry>, InfraError> {
129132
let conn = pool.get().await.map_err(adapt_infra_error)?;
130-
let start_datetime = NaiveDateTime::from_timestamp_opt(start_timestamp as i64, 0).unwrap();
131-
let end_datetime = NaiveDateTime::from_timestamp_opt(end_timestamp as i64, 0).unwrap();
133+
let start_datetime = NaiveDateTime::from_timestamp_opt(start_timestamp as i64, 0)
134+
.ok_or(InfraError::InvalidTimeStamp)?;
135+
let end_datetime = NaiveDateTime::from_timestamp_opt(end_timestamp as i64, 0)
136+
.ok_or(InfraError::InvalidTimeStamp)?;
132137

133138
let raw_sql = r#"
134139
SELECT

pragma-node/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ async fn main() {
3737
schemas(pragma_entities::dto::Publisher, pragma_entities::PublisherError),
3838
schemas(handlers::entries::CreateEntryRequest, handlers::entries::CreateEntryResponse, handlers::entries::GetEntryResponse, handlers::entries::GetVolatilityResponse),
3939
schemas(handlers::entries::Entry, handlers::entries::BaseEntry),
40-
schemas(infra::errors::InfraError),
40+
schemas(pragma_entities::error::InfraError),
4141
),
4242
modifiers(&SecurityAddon),
4343
tags(

0 commit comments

Comments
 (0)