Skip to content

Commit e345010

Browse files
committed
feat: candlestick views
1 parent dd37640 commit e345010

File tree

8 files changed

+322
-6
lines changed

8 files changed

+322
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
-- This file should undo anything in `up.sql`
2+
DROP MATERIALIZED VIEW IF EXISTS one_day_candle;
3+
DROP MATERIALIZED VIEW IF EXISTS one_hour_candle;
4+
DROP MATERIALIZED VIEW IF EXISTS fifteen_minute_candle;
5+
DROP MATERIALIZED VIEW IF EXISTS five_minute_candle;
6+
DROP MATERIALIZED VIEW IF EXISTS one_minute_candle;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
-- 1 day candle
2+
CREATE MATERIALIZED VIEW one_day_candle
3+
WITH (timescaledb.continuous) AS
4+
SELECT
5+
time_bucket('1 day', timestamp) AS bucket,
6+
pair_id,
7+
FIRST(price, timestamp) AS "open",
8+
MAX(price) AS high,
9+
MIN(price) AS low,
10+
LAST(price, timestamp) AS "close"
11+
FROM entries
12+
GROUP BY bucket, pair_id
13+
WITH NO DATA;
14+
15+
16+
SELECT add_continuous_aggregate_policy('one_day_candle',
17+
start_offset => INTERVAL '3 days',
18+
end_offset => INTERVAL '1 day',
19+
schedule_interval => INTERVAL '1 day');
20+
21+
-- 1 hour candle
22+
CREATE MATERIALIZED VIEW one_hour_candle
23+
WITH (timescaledb.continuous) AS
24+
SELECT
25+
time_bucket('1 hour', timestamp) AS bucket,
26+
pair_id,
27+
FIRST(price, timestamp) AS "open",
28+
MAX(price) AS high,
29+
MIN(price) AS low,
30+
LAST(price, timestamp) AS "close"
31+
FROM entries
32+
GROUP BY bucket, pair_id
33+
WITH NO DATA;
34+
35+
SELECT add_continuous_aggregate_policy('one_hour_candle',
36+
start_offset => INTERVAL '3 hours',
37+
end_offset => INTERVAL '1 hour',
38+
schedule_interval => INTERVAL '1 hour');
39+
40+
-- 15 minute candle
41+
CREATE MATERIALIZED VIEW fifteen_minute_candle
42+
WITH (timescaledb.continuous) AS
43+
SELECT
44+
time_bucket('15 minutes', timestamp) AS bucket,
45+
pair_id,
46+
FIRST(price, timestamp)::numeric AS "open",
47+
MAX(price)::numeric AS high,
48+
MIN(price)::numeric AS low,
49+
LAST(price, timestamp)::numeric AS "close"
50+
FROM entries
51+
GROUP BY bucket, pair_id
52+
WITH NO DATA;
53+
54+
SELECT add_continuous_aggregate_policy('fifteen_minute_candle',
55+
start_offset => INTERVAL '45 minutes',
56+
end_offset => INTERVAL '15 minutes',
57+
schedule_interval => INTERVAL '15 minutes');
58+
59+
-- 5 minute candle
60+
CREATE MATERIALIZED VIEW five_minute_candle
61+
WITH (timescaledb.continuous) AS
62+
SELECT
63+
time_bucket('5 minutes', timestamp) AS bucket,
64+
pair_id,
65+
FIRST(price, timestamp) AS "open",
66+
MAX(price) AS high,
67+
MIN(price) AS low,
68+
LAST(price, timestamp) AS "close"
69+
FROM entries
70+
GROUP BY bucket, pair_id
71+
WITH NO DATA;
72+
73+
SELECT add_continuous_aggregate_policy('five_minute_candle',
74+
start_offset => INTERVAL '15 minutes',
75+
end_offset => INTERVAL '5 minutes',
76+
schedule_interval => INTERVAL '5 minutes');
77+
78+
-- 1 minute candle
79+
CREATE MATERIALIZED VIEW one_minute_candle
80+
WITH (timescaledb.continuous) AS
81+
SELECT
82+
time_bucket('1 minute', timestamp) AS bucket,
83+
pair_id,
84+
FIRST(price, timestamp) AS "open",
85+
MAX(price) AS high,
86+
MIN(price) AS low,
87+
LAST(price, timestamp) AS "close"
88+
FROM entries
89+
GROUP BY bucket, pair_id
90+
WITH NO DATA;
91+
92+
SELECT add_continuous_aggregate_policy('one_minute_candle',
93+
start_offset => INTERVAL '3 minutes',
94+
end_offset => INTERVAL '1 minute',
95+
schedule_interval => INTERVAL '1 minute');

pragma-node/src/handlers/entries/get_entry.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use super::GetEntryParams;
1919
),
2020
params(
2121
("quote" = String, Path, description = "Quote Asset"),
22-
("base" = String, Path, description = "Base Asset")
22+
("base" = String, Path, description = "Base Asset"),
23+
GetEntryParams,
2324
),
2425
)]
2526
pub async fn get_entry(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use axum::extract::{Query, State};
2+
use axum::Json;
3+
4+
use crate::handlers::entries::{GetOHLCResponse, Interval};
5+
use crate::infra::repositories::entry_repository::{self, OHLCEntry};
6+
use crate::utils::PathExtractor;
7+
use crate::AppState;
8+
use pragma_entities::{error::InfraError, EntryError};
9+
10+
use super::utils::currency_pair_to_pair_id;
11+
use super::GetEntryParams;
12+
13+
#[utoipa::path(
14+
get,
15+
path = "/node/v1/aggregation/candlestick/{base}/{quote}",
16+
responses(
17+
(status = 200, description = "Get OHLC data successfuly", body = [GetOHLCResponse])
18+
),
19+
params(
20+
("quote" = String, Path, description = "Quote Asset"),
21+
("base" = String, Path, description = "Base Asset"),
22+
GetEntryParams,
23+
),
24+
)]
25+
pub async fn get_ohlc(
26+
State(state): State<AppState>,
27+
PathExtractor(pair): PathExtractor<(String, String)>,
28+
Query(params): Query<GetEntryParams>,
29+
) -> Result<Json<GetOHLCResponse>, EntryError> {
30+
tracing::info!("Received get entry request for pair {:?}", pair);
31+
// Construct pair id
32+
let pair_id = currency_pair_to_pair_id(&pair.1, &pair.0);
33+
34+
let now = chrono::Utc::now().naive_utc().timestamp_millis() as u64;
35+
36+
let timestamp = if let Some(timestamp) = params.timestamp {
37+
timestamp
38+
} else {
39+
now
40+
};
41+
42+
let interval = if let Some(interval) = params.interval {
43+
interval
44+
} else {
45+
Interval::OneMinute
46+
};
47+
48+
// Validate given timestamp
49+
if timestamp > now {
50+
return Err(EntryError::InvalidTimestamp);
51+
}
52+
53+
let entries = entry_repository::get_ohlc(&state.pool, pair_id.clone(), interval, timestamp)
54+
.await
55+
.map_err(|db_error| to_entry_error(db_error, &pair_id))?;
56+
57+
Ok(Json(adapt_entry_to_entry_response(pair_id, &entries)))
58+
}
59+
60+
fn adapt_entry_to_entry_response(pair_id: String, entries: &[OHLCEntry]) -> GetOHLCResponse {
61+
GetOHLCResponse {
62+
pair_id,
63+
data: entries.to_vec(),
64+
}
65+
}
66+
67+
fn to_entry_error(error: InfraError, pair_id: &String) -> EntryError {
68+
match error {
69+
InfraError::InternalServerError => EntryError::InternalServerError,
70+
InfraError::NotFound => EntryError::NotFound(pair_id.to_string()),
71+
InfraError::InvalidTimeStamp => EntryError::InvalidTimestamp,
72+
}
73+
}

pragma-node/src/handlers/entries/mod.rs

+13-3
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
use serde::{Deserialize, Serialize};
22

33
use starknet::core::types::FieldElement;
4-
use utoipa::ToSchema;
4+
use utoipa::{IntoParams, ToSchema};
55

66
pub use create_entry::create_entries;
77
pub use get_entry::get_entry;
8+
pub use get_ohlc::get_ohlc;
89
pub use get_volatility::get_volatility;
910

11+
use crate::infra::repositories::entry_repository::OHLCEntry;
12+
1013
pub mod create_entry;
1114
pub mod get_entry;
15+
pub mod get_ohlc;
1216
pub mod get_volatility;
1317

1418
pub mod utils;
@@ -48,6 +52,12 @@ pub struct GetEntryResponse {
4852
decimals: u32,
4953
}
5054

55+
#[derive(Debug, Serialize, Deserialize, ToSchema)]
56+
pub struct GetOHLCResponse {
57+
pair_id: String,
58+
data: Vec<OHLCEntry>,
59+
}
60+
5161
#[derive(Debug, Serialize, Deserialize, ToSchema)]
5262
pub struct GetVolatilityResponse {
5363
pair_id: String,
@@ -58,7 +68,7 @@ pub struct GetVolatilityResponse {
5868
/// Query parameters structs
5969
6070
// Define an enum for the allowed intervals
61-
#[derive(Default, Debug, Deserialize)]
71+
#[derive(Default, Debug, Deserialize, ToSchema)]
6272
pub enum Interval {
6373
#[serde(rename = "1min")]
6474
#[default]
@@ -69,7 +79,7 @@ pub enum Interval {
6979
OneHour,
7080
}
7181

72-
#[derive(Debug, Deserialize)]
82+
#[derive(Debug, Deserialize, IntoParams, ToSchema)]
7383
pub struct GetEntryParams {
7484
pub timestamp: Option<u64>,
7585
pub interval: Option<Interval>,

pragma-node/src/infra/repositories/entry_repository.rs

+122
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,125 @@ pub async fn get_decimals(
243243

244244
Ok(decimals.to_u32().unwrap())
245245
}
246+
247+
#[derive(Debug, Clone, Serialize, Deserialize, Queryable)]
248+
pub struct OHLCEntry {
249+
pub time: NaiveDateTime,
250+
pub open: BigDecimal,
251+
pub low: BigDecimal,
252+
pub high: BigDecimal,
253+
pub close: BigDecimal,
254+
}
255+
256+
#[derive(Serialize, QueryableByName, Clone, Debug)]
257+
pub struct OHLCEntryRaw {
258+
#[diesel(sql_type = diesel::sql_types::Timestamptz)]
259+
pub time: NaiveDateTime,
260+
#[diesel(sql_type = diesel::sql_types::Numeric)]
261+
pub open: BigDecimal,
262+
#[diesel(sql_type = diesel::sql_types::Numeric)]
263+
pub high: BigDecimal,
264+
#[diesel(sql_type = diesel::sql_types::Numeric)]
265+
pub low: BigDecimal,
266+
#[diesel(sql_type = diesel::sql_types::Numeric)]
267+
pub close: BigDecimal,
268+
}
269+
270+
pub async fn get_ohlc(
271+
pool: &deadpool_diesel::postgres::Pool,
272+
pair_id: String,
273+
interval: Interval,
274+
time: u64,
275+
) -> Result<Vec<OHLCEntry>, InfraError> {
276+
let conn = pool.get().await.map_err(adapt_infra_error)?;
277+
278+
let raw_sql = match interval {
279+
Interval::OneMinute => {
280+
r#"
281+
-- query the materialized realtime view
282+
SELECT
283+
bucket AS time,
284+
open,
285+
high,
286+
low,
287+
close
288+
FROM
289+
one_minute_candle
290+
WHERE
291+
pair_id = $1
292+
AND
293+
bucket <= $2
294+
ORDER BY
295+
time DESC
296+
LIMIT 10000;
297+
"#
298+
}
299+
Interval::FifteenMinutes => {
300+
r#"
301+
-- query the materialized realtime view
302+
SELECT
303+
bucket AS time,
304+
open,
305+
high,
306+
low,
307+
close
308+
FROM
309+
fifteen_minute_candle
310+
WHERE
311+
pair_id = $1
312+
AND
313+
bucket <= $2
314+
ORDER BY
315+
time DESC
316+
LIMIT 10000;
317+
"#
318+
}
319+
Interval::OneHour => {
320+
r#"
321+
-- query the materialized realtime view
322+
SELECT
323+
bucket AS time,
324+
open,
325+
high,
326+
low,
327+
close
328+
FROM
329+
one_hour_candle
330+
WHERE
331+
pair_id = $1
332+
AND
333+
bucket <= $2
334+
ORDER BY
335+
time DESC
336+
LIMIT 10000;
337+
"#
338+
}
339+
};
340+
341+
let date_time =
342+
NaiveDateTime::from_timestamp_millis(time as i64).ok_or(InfraError::InvalidTimeStamp)?;
343+
344+
let raw_entries = conn
345+
.interact(move |conn| {
346+
diesel::sql_query(raw_sql)
347+
.bind::<diesel::sql_types::Text, _>(pair_id)
348+
.bind::<diesel::sql_types::Timestamptz, _>(date_time)
349+
.load::<OHLCEntryRaw>(conn)
350+
})
351+
.await
352+
.map_err(adapt_infra_error)?
353+
.map_err(adapt_infra_error)?;
354+
355+
let entries: Vec<OHLCEntry> = raw_entries
356+
.into_iter()
357+
.map(|raw_entry| OHLCEntry {
358+
time: raw_entry.time,
359+
open: raw_entry.open,
360+
high: raw_entry.high,
361+
low: raw_entry.low,
362+
close: raw_entry.close,
363+
})
364+
.collect();
365+
366+
Ok(entries)
367+
}

pragma-node/src/main.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ async fn main() {
3030
paths(
3131
handlers::entries::create_entry::create_entries,
3232
handlers::entries::get_entry::get_entry,
33+
handlers::entries::get_ohlc::get_ohlc,
3334
handlers::entries::get_volatility::get_volatility,
3435
),
3536
components(
3637
schemas(pragma_entities::dto::Entry, pragma_entities::EntryError),
3738
schemas(pragma_entities::dto::Publisher, pragma_entities::PublisherError),
38-
schemas(handlers::entries::CreateEntryRequest, handlers::entries::CreateEntryResponse, handlers::entries::GetEntryResponse, handlers::entries::GetVolatilityResponse),
39+
schemas(handlers::entries::CreateEntryRequest, handlers::entries::CreateEntryResponse, handlers::entries::GetEntryResponse, handlers::entries::GetVolatilityResponse, handlers::entries::GetOHLCResponse),
40+
schemas(handlers::entries::GetEntryParams, handlers::entries::Interval),
3941
schemas(handlers::entries::Entry, handlers::entries::BaseEntry),
4042
schemas(pragma_entities::error::InfraError),
4143
),

0 commit comments

Comments
 (0)