Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add TWAP aggregation #34

Merged
merged 3 commits into from
Jan 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file should undo anything in `up.sql`
DROP MATERIALIZED VIEW IF EXISTS twap_1_min_agg;
DROP MATERIALIZED VIEW IF EXISTS twap_15_min_agg;
DROP MATERIALIZED VIEW IF EXISTS twap_1_hour_agg;
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
-- 1min TWAP
CREATE MATERIALIZED VIEW twap_1_min_agg
WITH (timescaledb.continuous, timescaledb.materialized_only = true)
AS SELECT
pair_id,
time_bucket('1 min'::interval, timestamp) as bucket,
average(time_weight('Linear', timestamp, price))::numeric as price_twap,
COUNT(DISTINCT source) as num_sources
FROM entries
GROUP BY bucket, pair_id
WITH NO DATA;

SELECT add_continuous_aggregate_policy('twap_1_min_agg',
start_offset => NULL,
end_offset => INTERVAL '1 min',
schedule_interval => INTERVAL '1 min');

-- 15min TWAP
CREATE MATERIALIZED VIEW twap_15_min_agg
WITH (timescaledb.continuous, timescaledb.materialized_only = true)
AS SELECT
pair_id,
time_bucket('15 min'::interval, timestamp) as bucket,
average(time_weight('Linear', timestamp, price))::numeric as price_twap,
COUNT(DISTINCT source) as num_sources
FROM entries
GROUP BY bucket, pair_id
WITH NO DATA;

SELECT add_continuous_aggregate_policy('twap_15_min_agg',
start_offset => NULL,
end_offset => INTERVAL '15 min',
schedule_interval => INTERVAL '15 min');

-- 1hour TWAP
CREATE MATERIALIZED VIEW twap_1_hour_agg
WITH (timescaledb.continuous, timescaledb.materialized_only = true)
AS SELECT
pair_id,
time_bucket('1 hour'::interval, timestamp) as bucket,
average(time_weight('Linear', timestamp, price))::numeric as price_twap,
COUNT(DISTINCT source) as num_sources
FROM entries
GROUP BY bucket, pair_id
WITH NO DATA;

SELECT add_continuous_aggregate_policy('twap_1_hour_agg',
start_offset => NULL,
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
9 changes: 8 additions & 1 deletion pragma-node/src/handlers/entries/get_entry.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ use axum::extract::{Query, State};
use axum::Json;
use bigdecimal::num_bigint::ToBigInt;

use crate::handlers::entries::{GetEntryResponse, Interval};
use crate::handlers::entries::{AggregationMode, GetEntryResponse, Interval};
use crate::infra::repositories::entry_repository::{self, MedianEntry};
use crate::utils::PathExtractor;
use crate::AppState;
@@ -46,6 +46,12 @@ pub async fn get_entry(
Interval::OneMinute
};

let aggregation_mode = if let Some(aggregation_mode) = params.aggregation {
aggregation_mode
} else {
AggregationMode::Median
};

let is_routing = params.routing.unwrap_or(false);

// Validate given timestamp
@@ -59,6 +65,7 @@ pub async fn get_entry(
interval,
timestamp,
is_routing,
aggregation_mode,
)
.await
.map_err(|e| to_entry_error(e, &pair_id))?;
14 changes: 13 additions & 1 deletion pragma-node/src/handlers/entries/mod.rs
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@ pub struct GetVolatilityResponse {

/// Query parameters structs
// Define an enum for the allowed intervals
// Supported Aggregation Intervals
#[derive(Default, Debug, Deserialize, ToSchema, Clone, Copy)]
pub enum Interval {
#[serde(rename = "1min")]
@@ -79,11 +79,22 @@ pub enum Interval {
OneHour,
}

// Supported Aggregation Modes
#[derive(Default, Debug, Deserialize, ToSchema, Clone, Copy)]
pub enum AggregationMode {
#[serde(rename = "median")]
#[default]
Median,
#[serde(rename = "twap")]
Twap,
}

#[derive(Debug, Deserialize, IntoParams, ToSchema)]
pub struct GetEntryParams {
pub timestamp: Option<u64>,
pub interval: Option<Interval>,
pub routing: Option<bool>,
pub aggregation: Option<AggregationMode>,
}

impl Default for GetEntryParams {
@@ -92,6 +103,7 @@ impl Default for GetEntryParams {
timestamp: Some(chrono::Utc::now().timestamp_millis() as u64),
interval: Some(Interval::default()),
routing: Some(false),
aggregation: Some(AggregationMode::default()),
}
}
}
109 changes: 103 additions & 6 deletions pragma-node/src/infra/repositories/entry_repository.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ use pragma_entities::{
Currency, Entry, NewEntry,
};

use crate::handlers::entries::Interval;
use crate::handlers::entries::{AggregationMode, Interval};
use crate::utils::{convert_via_quote, normalize_to_decimals};

#[derive(Deserialize)]
@@ -88,9 +88,10 @@ pub async fn routing(
interval: Interval,
timestamp: u64,
is_routing: bool,
agg_mode: AggregationMode,
) -> Result<(MedianEntry, u32), InfraError> {
if pair_id_exist(pool, pair_id.clone()).await? || !is_routing {
return get_price_decimals(pool, pair_id, interval, timestamp).await;
return get_price_decimals(pool, pair_id, interval, timestamp, agg_mode).await;
}

let [base, quote]: [&str; 2] = pair_id
@@ -99,7 +100,7 @@ pub async fn routing(
.try_into()
.map_err(|_| InfraError::InternalServerError)?;

match find_alternative_pair_price(pool, base, quote, interval, timestamp).await {
match find_alternative_pair_price(pool, base, quote, interval, timestamp, agg_mode).await {
Ok(result) => Ok(result),
Err(_) => Err(InfraError::NotFound),
}
@@ -159,6 +160,7 @@ async fn find_alternative_pair_price(
quote: &str,
interval: Interval,
timestamp: u64,
agg_mode: AggregationMode,
) -> Result<(MedianEntry, u32), InfraError> {
let conn = pool.get().await.map_err(adapt_infra_error)?;

@@ -176,9 +178,9 @@ async fn find_alternative_pair_price(
&& pair_id_exist(pool, alt_quote_pair.clone()).await?
{
let base_alt_result =
get_price_decimals(pool, base_alt_pair, interval, timestamp).await?;
get_price_decimals(pool, base_alt_pair, interval, timestamp, agg_mode).await?;
let alt_quote_result =
get_price_decimals(pool, alt_quote_pair, interval, timestamp).await?;
get_price_decimals(pool, alt_quote_pair, interval, timestamp, agg_mode).await?;

return calculate_rebased_price(base_alt_result, alt_quote_result);
}
@@ -207,14 +209,109 @@ async fn get_price_decimals(
pair_id: String,
interval: Interval,
timestamp: u64,
agg_mode: AggregationMode,
) -> Result<(MedianEntry, u32), InfraError> {
let entry = get_median_price(pool, pair_id.clone(), interval, timestamp).await?;
let entry = match agg_mode {
AggregationMode::Median => {
get_median_price(pool, pair_id.clone(), interval, timestamp).await?
}
AggregationMode::Twap => get_twap_price(pool, pair_id.clone(), interval, timestamp).await?,
};

let decimals = get_decimals(pool, &pair_id).await?;

Ok((entry, decimals))
}

pub async fn get_twap_price(
pool: &deadpool_diesel::postgres::Pool,
pair_id: String,
interval: Interval,
time: u64,
) -> Result<MedianEntry, InfraError> {
let conn = pool.get().await.map_err(adapt_infra_error)?;

let raw_sql = match interval {
Interval::OneMinute => {
r#"
-- query the materialized realtime view
SELECT
bucket AS time,
price_twap AS median_price,
num_sources
FROM
twap_1_min_agg
WHERE
pair_id = $1
AND
bucket <= $2
ORDER BY
time DESC
LIMIT 1;
"#
}
Interval::FifteenMinutes => {
r#"
-- query the materialized realtime view
SELECT
bucket AS time,
price_twap AS median_price,
num_sources
FROM
twap_15_min_agg
WHERE
pair_id = $1
AND
bucket <= $2
ORDER BY
time DESC
LIMIT 1;
"#
}
Interval::OneHour => {
r#"
-- query the materialized realtime view
SELECT
bucket AS time,
price_twap AS median_price,
num_sources
FROM
twap_1_h_agg
WHERE
pair_id = $1
AND
bucket <= $2
ORDER BY
time DESC
LIMIT 1;
"#
}
};

let date_time =
NaiveDateTime::from_timestamp_millis(time as i64).ok_or(InfraError::InvalidTimeStamp)?;

let raw_entry = conn
.interact(move |conn| {
diesel::sql_query(raw_sql)
.bind::<diesel::sql_types::Text, _>(pair_id)
.bind::<diesel::sql_types::Timestamptz, _>(date_time)
.load::<MedianEntryRaw>(conn)
})
.await
.map_err(adapt_infra_error)?
.map_err(adapt_infra_error)?;

let raw_entry = raw_entry.first().ok_or(InfraError::NotFound)?;

let entry: MedianEntry = MedianEntry {
time: raw_entry.time,
median_price: raw_entry.median_price.clone(),
num_sources: raw_entry.num_sources,
};

Ok(entry)
}
pub async fn get_median_price(
pool: &deadpool_diesel::postgres::Pool,
pair_id: String,