Skip to content

Commit 581e8a8

Browse files
authored
✨ Add TWAP aggregation (#34)
* feat: twap views migration * feat: aggregation query param * fix: twap value name
1 parent e62efb9 commit 581e8a8

File tree

5 files changed

+178
-8
lines changed

5 files changed

+178
-8
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- This file should undo anything in `up.sql`
2+
DROP MATERIALIZED VIEW IF EXISTS twap_1_min_agg;
3+
DROP MATERIALIZED VIEW IF EXISTS twap_15_min_agg;
4+
DROP MATERIALIZED VIEW IF EXISTS twap_1_hour_agg;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
-- 1min TWAP
2+
CREATE MATERIALIZED VIEW twap_1_min_agg
3+
WITH (timescaledb.continuous, timescaledb.materialized_only = true)
4+
AS SELECT
5+
pair_id,
6+
time_bucket('1 min'::interval, timestamp) as bucket,
7+
average(time_weight('Linear', timestamp, price))::numeric as price_twap,
8+
COUNT(DISTINCT source) as num_sources
9+
FROM entries
10+
GROUP BY bucket, pair_id
11+
WITH NO DATA;
12+
13+
SELECT add_continuous_aggregate_policy('twap_1_min_agg',
14+
start_offset => NULL,
15+
end_offset => INTERVAL '1 min',
16+
schedule_interval => INTERVAL '1 min');
17+
18+
-- 15min TWAP
19+
CREATE MATERIALIZED VIEW twap_15_min_agg
20+
WITH (timescaledb.continuous, timescaledb.materialized_only = true)
21+
AS SELECT
22+
pair_id,
23+
time_bucket('15 min'::interval, timestamp) as bucket,
24+
average(time_weight('Linear', timestamp, price))::numeric as price_twap,
25+
COUNT(DISTINCT source) as num_sources
26+
FROM entries
27+
GROUP BY bucket, pair_id
28+
WITH NO DATA;
29+
30+
SELECT add_continuous_aggregate_policy('twap_15_min_agg',
31+
start_offset => NULL,
32+
end_offset => INTERVAL '15 min',
33+
schedule_interval => INTERVAL '15 min');
34+
35+
-- 1hour TWAP
36+
CREATE MATERIALIZED VIEW twap_1_hour_agg
37+
WITH (timescaledb.continuous, timescaledb.materialized_only = true)
38+
AS SELECT
39+
pair_id,
40+
time_bucket('1 hour'::interval, timestamp) as bucket,
41+
average(time_weight('Linear', timestamp, price))::numeric as price_twap,
42+
COUNT(DISTINCT source) as num_sources
43+
FROM entries
44+
GROUP BY bucket, pair_id
45+
WITH NO DATA;
46+
47+
SELECT add_continuous_aggregate_policy('twap_1_hour_agg',
48+
start_offset => NULL,
49+
end_offset => INTERVAL '1 hour',
50+
schedule_interval => INTERVAL '1 hour');

pragma-node/src/handlers/entries/get_entry.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use axum::extract::{Query, State};
22
use axum::Json;
33
use bigdecimal::num_bigint::ToBigInt;
44

5-
use crate::handlers::entries::{GetEntryResponse, Interval};
5+
use crate::handlers::entries::{AggregationMode, GetEntryResponse, Interval};
66
use crate::infra::repositories::entry_repository::{self, MedianEntry};
77
use crate::utils::PathExtractor;
88
use crate::AppState;
@@ -46,6 +46,12 @@ pub async fn get_entry(
4646
Interval::OneMinute
4747
};
4848

49+
let aggregation_mode = if let Some(aggregation_mode) = params.aggregation {
50+
aggregation_mode
51+
} else {
52+
AggregationMode::Median
53+
};
54+
4955
let is_routing = params.routing.unwrap_or(false);
5056

5157
// Validate given timestamp
@@ -59,6 +65,7 @@ pub async fn get_entry(
5965
interval,
6066
timestamp,
6167
is_routing,
68+
aggregation_mode,
6269
)
6370
.await
6471
.map_err(|e| to_entry_error(e, &pair_id))?;

pragma-node/src/handlers/entries/mod.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ pub struct GetVolatilityResponse {
6767

6868
/// Query parameters structs
6969
70-
// Define an enum for the allowed intervals
70+
// Supported Aggregation Intervals
7171
#[derive(Default, Debug, Deserialize, ToSchema, Clone, Copy)]
7272
pub enum Interval {
7373
#[serde(rename = "1min")]
@@ -79,11 +79,22 @@ pub enum Interval {
7979
OneHour,
8080
}
8181

82+
// Supported Aggregation Modes
83+
#[derive(Default, Debug, Deserialize, ToSchema, Clone, Copy)]
84+
pub enum AggregationMode {
85+
#[serde(rename = "median")]
86+
#[default]
87+
Median,
88+
#[serde(rename = "twap")]
89+
Twap,
90+
}
91+
8292
#[derive(Debug, Deserialize, IntoParams, ToSchema)]
8393
pub struct GetEntryParams {
8494
pub timestamp: Option<u64>,
8595
pub interval: Option<Interval>,
8696
pub routing: Option<bool>,
97+
pub aggregation: Option<AggregationMode>,
8798
}
8899

89100
impl Default for GetEntryParams {
@@ -92,6 +103,7 @@ impl Default for GetEntryParams {
92103
timestamp: Some(chrono::Utc::now().timestamp_millis() as u64),
93104
interval: Some(Interval::default()),
94105
routing: Some(false),
106+
aggregation: Some(AggregationMode::default()),
95107
}
96108
}
97109
}

pragma-node/src/infra/repositories/entry_repository.rs

+103-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use pragma_entities::{
1111
Currency, Entry, NewEntry,
1212
};
1313

14-
use crate::handlers::entries::Interval;
14+
use crate::handlers::entries::{AggregationMode, Interval};
1515
use crate::utils::{convert_via_quote, normalize_to_decimals};
1616

1717
#[derive(Deserialize)]
@@ -88,9 +88,10 @@ pub async fn routing(
8888
interval: Interval,
8989
timestamp: u64,
9090
is_routing: bool,
91+
agg_mode: AggregationMode,
9192
) -> Result<(MedianEntry, u32), InfraError> {
9293
if pair_id_exist(pool, pair_id.clone()).await? || !is_routing {
93-
return get_price_decimals(pool, pair_id, interval, timestamp).await;
94+
return get_price_decimals(pool, pair_id, interval, timestamp, agg_mode).await;
9495
}
9596

9697
let [base, quote]: [&str; 2] = pair_id
@@ -99,7 +100,7 @@ pub async fn routing(
99100
.try_into()
100101
.map_err(|_| InfraError::InternalServerError)?;
101102

102-
match find_alternative_pair_price(pool, base, quote, interval, timestamp).await {
103+
match find_alternative_pair_price(pool, base, quote, interval, timestamp, agg_mode).await {
103104
Ok(result) => Ok(result),
104105
Err(_) => Err(InfraError::NotFound),
105106
}
@@ -159,6 +160,7 @@ async fn find_alternative_pair_price(
159160
quote: &str,
160161
interval: Interval,
161162
timestamp: u64,
163+
agg_mode: AggregationMode,
162164
) -> Result<(MedianEntry, u32), InfraError> {
163165
let conn = pool.get().await.map_err(adapt_infra_error)?;
164166

@@ -176,9 +178,9 @@ async fn find_alternative_pair_price(
176178
&& pair_id_exist(pool, alt_quote_pair.clone()).await?
177179
{
178180
let base_alt_result =
179-
get_price_decimals(pool, base_alt_pair, interval, timestamp).await?;
181+
get_price_decimals(pool, base_alt_pair, interval, timestamp, agg_mode).await?;
180182
let alt_quote_result =
181-
get_price_decimals(pool, alt_quote_pair, interval, timestamp).await?;
183+
get_price_decimals(pool, alt_quote_pair, interval, timestamp, agg_mode).await?;
182184

183185
return calculate_rebased_price(base_alt_result, alt_quote_result);
184186
}
@@ -207,14 +209,109 @@ async fn get_price_decimals(
207209
pair_id: String,
208210
interval: Interval,
209211
timestamp: u64,
212+
agg_mode: AggregationMode,
210213
) -> Result<(MedianEntry, u32), InfraError> {
211-
let entry = get_median_price(pool, pair_id.clone(), interval, timestamp).await?;
214+
let entry = match agg_mode {
215+
AggregationMode::Median => {
216+
get_median_price(pool, pair_id.clone(), interval, timestamp).await?
217+
}
218+
AggregationMode::Twap => get_twap_price(pool, pair_id.clone(), interval, timestamp).await?,
219+
};
212220

213221
let decimals = get_decimals(pool, &pair_id).await?;
214222

215223
Ok((entry, decimals))
216224
}
217225

226+
pub async fn get_twap_price(
227+
pool: &deadpool_diesel::postgres::Pool,
228+
pair_id: String,
229+
interval: Interval,
230+
time: u64,
231+
) -> Result<MedianEntry, InfraError> {
232+
let conn = pool.get().await.map_err(adapt_infra_error)?;
233+
234+
let raw_sql = match interval {
235+
Interval::OneMinute => {
236+
r#"
237+
-- query the materialized realtime view
238+
SELECT
239+
bucket AS time,
240+
price_twap AS median_price,
241+
num_sources
242+
FROM
243+
twap_1_min_agg
244+
WHERE
245+
pair_id = $1
246+
AND
247+
bucket <= $2
248+
ORDER BY
249+
time DESC
250+
LIMIT 1;
251+
"#
252+
}
253+
Interval::FifteenMinutes => {
254+
r#"
255+
-- query the materialized realtime view
256+
SELECT
257+
bucket AS time,
258+
price_twap AS median_price,
259+
num_sources
260+
FROM
261+
twap_15_min_agg
262+
WHERE
263+
pair_id = $1
264+
AND
265+
bucket <= $2
266+
ORDER BY
267+
time DESC
268+
LIMIT 1;
269+
"#
270+
}
271+
Interval::OneHour => {
272+
r#"
273+
-- query the materialized realtime view
274+
SELECT
275+
bucket AS time,
276+
price_twap AS median_price,
277+
num_sources
278+
FROM
279+
twap_1_h_agg
280+
WHERE
281+
pair_id = $1
282+
AND
283+
bucket <= $2
284+
ORDER BY
285+
time DESC
286+
LIMIT 1;
287+
"#
288+
}
289+
};
290+
291+
let date_time =
292+
NaiveDateTime::from_timestamp_millis(time as i64).ok_or(InfraError::InvalidTimeStamp)?;
293+
294+
let raw_entry = conn
295+
.interact(move |conn| {
296+
diesel::sql_query(raw_sql)
297+
.bind::<diesel::sql_types::Text, _>(pair_id)
298+
.bind::<diesel::sql_types::Timestamptz, _>(date_time)
299+
.load::<MedianEntryRaw>(conn)
300+
})
301+
.await
302+
.map_err(adapt_infra_error)?
303+
.map_err(adapt_infra_error)?;
304+
305+
let raw_entry = raw_entry.first().ok_or(InfraError::NotFound)?;
306+
307+
let entry: MedianEntry = MedianEntry {
308+
time: raw_entry.time,
309+
median_price: raw_entry.median_price.clone(),
310+
num_sources: raw_entry.num_sources,
311+
};
312+
313+
Ok(entry)
314+
}
218315
pub async fn get_median_price(
219316
pool: &deadpool_diesel::postgres::Pool,
220317
pair_id: String,

0 commit comments

Comments
 (0)