Skip to content

Commit 0674de9

Browse files
committed
feat: adapt route to use new views
1 parent 5b92db9 commit 0674de9

File tree

7 files changed

+73
-66
lines changed

7 files changed

+73
-66
lines changed

pragma-entities/migrations/2023-10-12-232125_add_publishers_table/up.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ CREATE TABLE publishers (
66
active_key VARCHAR NOT NULL,
77
active BOOLEAN NOT NULL,
88
PRIMARY KEY (id)
9-
);
9+
);
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
11
-- Your SQL goes here
22
ALTER TABLE publishers
3-
ADD COLUMN account_address VARCHAR NOT NULL DEFAULT '';
3+
ADD COLUMN account_address VARCHAR NOT NULL DEFAULT '';
4+
5+
INSERT INTO publishers (name, master_key, active_key, active, account_address)
6+
VALUES (
7+
'PRAGMA',
8+
'0x05e6361b53afbb451d1326ed4e37aecff9ef68af8318eb3c8dc58bcadfc16705',
9+
'0x05e6361b53afbb451d1326ed4e37aecff9ef68af8318eb3c8dc58bcadfc16705',
10+
true,
11+
'0x624EBFB99865079BD58CFCFB925B6F5CE940D6F6E41E118B8A72B7163FB435C'
12+
);
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
-- Your SQL goes here
22
CREATE UNIQUE INDEX idx_entries_unique
3-
ON entries(pair_id, source, timestamp);
3+
ON entries(pair_id, source, timestamp DESC);
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,49 @@
11
-- Your SQL goes here
22
CREATE MATERIALIZED VIEW price_1_min_agg
33
WITH (timescaledb.continuous, timescaledb.materialized_only = true)
4-
AS SELECT source,
4+
AS SELECT
55
pair_id,
66
time_bucket('1 min'::interval, timestamp) as bucket,
7-
percentile_agg(price)
7+
approx_percentile(0.5, percentile_agg(price)) AS median_price,
8+
COUNT(DISTINCT source) as num_sources
89
FROM entries
9-
GROUP BY source, pair_id, bucket
10+
GROUP BY bucket, pair_id
1011
WITH NO DATA;
1112

1213
SELECT add_continuous_aggregate_policy('price_1_min_agg',
13-
start_offset => INTERVAL '3 min',
14+
start_offset => NULL,
1415
end_offset => INTERVAL '1 min',
1516
schedule_interval => INTERVAL '1 min');
1617

1718
CREATE MATERIALIZED VIEW price_15_min_agg
1819
WITH (timescaledb.continuous, timescaledb.materialized_only = true)
19-
AS SELECT source,
20+
AS SELECT
2021
pair_id,
2122
time_bucket('15 min'::interval, timestamp) as bucket,
22-
percentile_agg(price)
23+
approx_percentile(0.5, percentile_agg(price)) AS median_price,
24+
COUNT(DISTINCT source) as num_sources
2325
FROM entries
24-
GROUP BY source, pair_id, bucket
26+
GROUP BY bucket, pair_id
2527
WITH NO DATA;
2628

2729
SELECT add_continuous_aggregate_policy('price_15_min_agg',
28-
start_offset => INTERVAL '45 min',
30+
start_offset => NULL,
2931
end_offset => INTERVAL '15 min',
3032
schedule_interval => INTERVAL '15 min');
3133

3234
CREATE MATERIALIZED VIEW price_1_h_agg
3335
WITH (timescaledb.continuous, timescaledb.materialized_only = true)
34-
AS SELECT source,
36+
AS SELECT
3537
pair_id,
3638
time_bucket('1 hour'::interval, timestamp) as bucket,
37-
percentile_agg(price)
39+
approx_percentile(0.5, percentile_agg(price)) AS median_price,
40+
COUNT(DISTINCT source) as num_sources
3841
FROM entries
39-
GROUP BY source, pair_id, bucket
42+
GROUP BY bucket, pair_id
4043
WITH NO DATA;
4144

4245
SELECT add_continuous_aggregate_policy('price_1_h_agg',
43-
start_offset => INTERVAL '3 hours',
46+
start_offset => NULL,
4447
end_offset => INTERVAL '1 hour',
4548
schedule_interval => INTERVAL '1 hour');
4649

pragma-node/src/handlers/entries/get_entry.rs

+10-18
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use axum::extract::State;
22
use axum::Json;
3-
use bigdecimal::num_bigint::{BigInt, ToBigInt};
3+
use bigdecimal::num_bigint::ToBigInt;
44

55
use crate::handlers::entries::GetEntryResponse;
66
use crate::infra::repositories::entry_repository::{self, MedianEntry};
77
use crate::utils::PathExtractor;
88
use crate::AppState;
99
use pragma_entities::{error::InfraError, EntryError};
1010

11-
use super::utils::{compute_median_price_and_time, currency_pair_to_pair_id};
11+
use super::utils::currency_pair_to_pair_id;
1212

1313
#[utoipa::path(
1414
get,
@@ -41,19 +41,14 @@ pub async fn get_entry(
4141
}
4242

4343
// Get entries from database with given pair id (only the latest one grouped by publisher)
44-
let mut entries = entry_repository::get_median_entries(&state.pool, pair_id.clone())
44+
let entry = entry_repository::get_median_price(&state.pool, pair_id.clone())
4545
.await
4646
.map_err(|db_error| match db_error {
4747
InfraError::InternalServerError => EntryError::InternalServerError,
4848
InfraError::NotFound => EntryError::NotFound(pair_id.clone()),
4949
InfraError::InvalidTimeStamp => EntryError::InvalidTimestamp,
5050
})?;
5151

52-
// Error if no entries found
53-
if entries.is_empty() {
54-
return Err(EntryError::UnknownPairId(pair_id));
55-
}
56-
5752
let decimals = entry_repository::get_decimals(&state.pool, &pair_id)
5853
.await
5954
.map_err(|db_error| match db_error {
@@ -63,28 +58,25 @@ pub async fn get_entry(
6358
})?;
6459

6560
Ok(Json(adapt_entry_to_entry_response(
66-
pair_id,
67-
&mut entries,
68-
decimals,
61+
pair_id, &entry, decimals,
6962
)))
7063
}
7164

7265
fn adapt_entry_to_entry_response(
7366
pair_id: String,
74-
entries: &mut Vec<MedianEntry>,
67+
entry: &MedianEntry,
7568
decimals: u32,
7669
) -> GetEntryResponse {
77-
let (price, timestamp) = compute_median_price_and_time(entries).unwrap_or_default();
78-
7970
GetEntryResponse {
8071
pair_id,
81-
timestamp: timestamp.timestamp_millis() as u64,
82-
num_sources_aggregated: entries.len(),
72+
timestamp: entry.time.timestamp_millis() as u64,
73+
num_sources_aggregated: entry.num_sources as usize,
8374
price: format!(
8475
"0x{}",
85-
price
76+
entry
77+
.median_price
8678
.to_bigint()
87-
.unwrap_or(BigInt::default())
79+
.unwrap_or_default()
8880
.to_str_radix(16)
8981
),
9082
decimals,

pragma-node/src/handlers/entries/utils.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub(crate) fn currency_pair_to_pair_id(quote: &str, base: &str) -> String {
1616
/// The median is computed as the middle value of a sorted list of values.
1717
/// If the list has an even number of values, the median is computed as the average of the two middle values.
1818
/// If the list is empty, None is returned.
19+
#[allow(dead_code)]
1920
pub(crate) fn compute_median_price_and_time(
2021
entries: &mut Vec<MedianEntry>,
2122
) -> Option<(BigDecimal, NaiveDateTime)> {
@@ -74,27 +75,27 @@ fn test_volatility() {
7475
MedianEntry {
7576
time: chrono::NaiveDateTime::from_timestamp_opt(1640995200, 0).unwrap(),
7677
median_price: bigdecimal::BigDecimal::from(47686),
77-
source: "source".to_string(),
78+
num_sources: 5,
7879
},
7980
MedianEntry {
8081
time: chrono::NaiveDateTime::from_timestamp_opt(1641081600, 0).unwrap(),
8182
median_price: bigdecimal::BigDecimal::from(47345),
82-
source: "source".to_string(),
83+
num_sources: 5,
8384
},
8485
MedianEntry {
8586
time: chrono::NaiveDateTime::from_timestamp_opt(1641168000, 0).unwrap(),
8687
median_price: bigdecimal::BigDecimal::from(46458),
87-
source: "source".to_string(),
88+
num_sources: 5,
8889
},
8990
MedianEntry {
9091
time: chrono::NaiveDateTime::from_timestamp_opt(1641254400, 0).unwrap(),
9192
median_price: bigdecimal::BigDecimal::from(45897),
92-
source: "source".to_string(),
93+
num_sources: 5,
9394
},
9495
MedianEntry {
9596
time: chrono::NaiveDateTime::from_timestamp_opt(1641340800, 0).unwrap(),
9697
median_price: bigdecimal::BigDecimal::from(43569),
97-
source: "source".to_string(),
98+
num_sources: 5,
9899
},
99100
];
100101

pragma-node/src/infra/repositories/entry_repository.rs

+30-28
Original file line numberDiff line numberDiff line change
@@ -64,42 +64,43 @@ pub async fn _get_all(
6464

6565
#[derive(Debug, Serialize, Queryable)]
6666
pub struct MedianEntry {
67-
pub source: String,
6867
pub time: NaiveDateTime,
6968
pub median_price: BigDecimal,
69+
pub num_sources: i32,
7070
}
7171

7272
#[derive(Serialize, QueryableByName)]
7373
pub struct MedianEntryRaw {
74-
#[diesel(sql_type = diesel::sql_types::Text)]
75-
pub source: String,
7674
#[diesel(sql_type = diesel::sql_types::Timestamp)]
7775
pub time: NaiveDateTime,
7876
#[diesel(sql_type = diesel::sql_types::Numeric)]
7977
pub median_price: BigDecimal,
78+
#[diesel(sql_type = diesel::sql_types::Integer)]
79+
pub num_sources: i32,
8080
}
8181

82-
pub async fn get_median_entries(
82+
pub async fn get_median_price(
8383
pool: &deadpool_diesel::postgres::Pool,
8484
pair_id: String,
85-
) -> Result<Vec<MedianEntry>, InfraError> {
85+
) -> Result<MedianEntry, InfraError> {
8686
let conn = pool.get().await.map_err(adapt_infra_error)?;
8787

8888
let raw_sql = r#"
8989
-- query the materialized realtime view
9090
SELECT
91-
source,
92-
MAX(bucket) AS time,
93-
approx_percentile(0.5, percentile_agg) AS median_price
91+
bucket AS time,
92+
median_price,
93+
num_sources
9494
FROM
9595
price_1_min_agg
9696
WHERE
9797
pair_id = $1
9898
ORDER BY
99-
source;
99+
time
100+
LIMIT 1;
100101
"#;
101102

102-
let raw_entries: Vec<MedianEntryRaw> = conn
103+
let raw_entry = conn
103104
.interact(move |conn| {
104105
diesel::sql_query(raw_sql)
105106
.bind::<diesel::sql_types::Text, _>(pair_id)
@@ -109,16 +110,15 @@ pub async fn get_median_entries(
109110
.map_err(adapt_infra_error)?
110111
.map_err(adapt_infra_error)?;
111112

112-
let entries: Vec<MedianEntry> = raw_entries
113-
.into_iter()
114-
.map(|raw_entry| MedianEntry {
115-
time: raw_entry.time,
116-
median_price: raw_entry.median_price,
117-
source: raw_entry.source,
118-
})
119-
.collect();
113+
let raw_entry = raw_entry.first().ok_or(InfraError::NotFound)?;
120114

121-
Ok(entries)
115+
let entry: MedianEntry = MedianEntry {
116+
time: raw_entry.time,
117+
median_price: raw_entry.median_price.clone(),
118+
num_sources: raw_entry.num_sources,
119+
};
120+
121+
Ok(entry)
122122
}
123123

124124
pub async fn get_entries_between(
@@ -135,14 +135,16 @@ pub async fn get_entries_between(
135135

136136
let raw_sql = r#"
137137
SELECT
138-
source,
139-
"timestamp" AS "time",
140-
PERCENTILE_DISC(0.5) WITHIN GROUP(ORDER BY price) AS "median_price"
141-
FROM entries
142-
WHERE pair_id = $1
143-
AND "timestamp" BETWEEN $2 AND $3
144-
GROUP BY (timestamp, source)
145-
ORDER BY timestamp ASC;
138+
bucket AS time,
139+
median_price,
140+
num_sources
141+
FROM price_1_min_agg
142+
WHERE
143+
pair_id = $1
144+
AND
145+
time BETWEEN $2 AND $3
146+
ORDER BY
147+
time DESC;
146148
"#;
147149

148150
let raw_entries = conn
@@ -162,7 +164,7 @@ pub async fn get_entries_between(
162164
.map(|raw_entry| MedianEntry {
163165
time: raw_entry.time,
164166
median_price: raw_entry.median_price,
165-
source: raw_entry.source,
167+
num_sources: raw_entry.num_sources,
166168
})
167169
.collect();
168170

0 commit comments

Comments
 (0)