diff --git a/Cargo.toml b/Cargo.toml index 80fc867..90cb219 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,5 +23,5 @@ tracing = "0.1.37" tracing-subscriber = "0.3.17" tracing-futures = "0.2.3" warp = "0.3.5" -valence_core = "0.1.5" +valence_core = "0.1.6" valence_market = "0.1.0" diff --git a/src/api/handlers.rs b/src/api/handlers.rs index 23f2443..35c982e 100644 --- a/src/api/handlers.rs +++ b/src/api/handlers.rs @@ -1,7 +1,8 @@ -use crate::api::utils::retrieve_from_db; -use crate::interfaces::SetRequestData; +use crate::api::utils::{retrieve_from_db, serialize_all_entries}; +use crate::interfaces::{SetRequestData, SetSaveData}; use futures::lock::Mutex; use serde_json::Value; +use std::collections::HashMap; use std::sync::Arc; use tracing::{error, info, warn}; use valence_core::api::errors::ApiErrorType; @@ -25,6 +26,7 @@ pub async fn get_data_handler< C: KvStoreConnection + Clone + Send + 'static, >( headers: warp::hyper::HeaderMap, + value_id: Option, db: Arc>, cache: Arc>, c_filter: CFilterConnection, @@ -45,8 +47,9 @@ pub async fn get_data_handler< // Check cache first let mut cache_lock_result = cache.lock().await; - let cache_result: Result>, _> = - cache_lock_result.get_data::(address).await; + let cache_result: Result>, _> = cache_lock_result + .get_data::(address, value_id.as_deref()) + .await; info!("Cache result: {:?}", cache_result); @@ -55,16 +58,32 @@ pub async fn get_data_handler< match value { Some(value) => { info!("Data retrieved from cache"); - let data = value - .iter() - .map(|v| serde_json::from_str(v).unwrap()) - .collect::>(); - return r.into_ok("Data retrieved successfully", json_serialize_embed(data)); + if let Some(id) = value_id { + if !value.contains_key(&id) { + return r.into_err_internal(ApiErrorType::Generic( + "Value ID not found".to_string(), + )); + } + + let data = value.get(&id).unwrap().clone(); + let final_result: Value = serde_json::from_str(&data).unwrap(); + return r.into_ok( + "Data retrieved successfully", + json_serialize_embed(final_result), + ); + } + + let final_value = serialize_all_entries(value); + + return r.into_ok( + "Data retrieved successfully", + json_serialize_embed(final_value), + ); } None => { // Default to checking from DB if cache is empty warn!("Cache lookup failed for address: {}", address); - retrieve_from_db(db, address).await + retrieve_from_db(db, address, value_id.as_deref()).await } } } @@ -73,7 +92,7 @@ pub async fn get_data_handler< warn!("Attempting to retrieve data from DB"); // Get data from DB - retrieve_from_db(db, address).await + retrieve_from_db(db, address, value_id.as_deref()).await } } } @@ -99,11 +118,22 @@ pub async fn set_data_handler< let r = CallResponse::new("set_data"); info!("SET_DATA requested with payload: {:?}", payload); + let data_to_save: SetSaveData = { + SetSaveData { + address: payload.address.clone(), + data: payload.data.clone(), + } + }; + // Add to cache let cache_result = cache .lock() .await - .set_data(&payload.address.clone(), serialize_data(&payload.data)) + .set_data( + &payload.address.clone(), + &payload.data_id, + serialize_data(&data_to_save), + ) .await; // Add to DB @@ -118,7 +148,7 @@ pub async fn set_data_handler< db.lock() .await - .set_data(&payload.address, payload.data) + .set_data(&payload.address, &payload.data_id, data_to_save) .await } Err(_) => { diff --git a/src/api/routes.rs b/src/api/routes.rs index 9f4a8a2..d2df43d 100644 --- a/src/api/routes.rs +++ b/src/api/routes.rs @@ -1,7 +1,5 @@ use crate::api::handlers::{get_data_handler, set_data_handler}; use futures::lock::Mutex; -use serde::de::DeserializeOwned; -use serde::Serialize; use std::sync::Arc; use tracing::debug; use valence_core::api::interfaces::CFilterConnection; @@ -13,19 +11,44 @@ use warp::{Filter, Rejection, Reply}; // ========== BASE ROUTES ========== // -/// GET /get_data +/// GET /get_data_with_id /// -/// Retrieves data associated with a given address +/// Retrieves data associated with a given address and a given id /// /// ### Arguments /// /// * `db` - The database connection to use /// * `cache` - The cache connection to use /// * `cuckoo_filter` - The cuckoo filter connection to use +pub fn get_data_with_id< + D: KvStoreConnection + Clone + Send + 'static, + C: KvStoreConnection + Clone + Send + 'static, +>( + db: Arc>, + cache: Arc>, + cuckoo_filter: CFilterConnection, +) -> impl Filter + Clone { + debug!("Setting up get_data_with_id route"); + + warp::path("get_data") + .and(warp::get()) + .and(sig_verify_middleware()) + .and(warp::header::headers_cloned()) + .and(warp::path::param::()) + .and(with_node_component(cache)) + .and(with_node_component(db)) + .and(with_node_component(cuckoo_filter)) + .and_then(move |_, headers, value_id: String, cache, db, cf| { + // Add type annotation for headers parameter + debug!("GET_DATA requested"); + map_api_res(get_data_handler(headers, Some(value_id), db, cache, cf)) + }) + .with(get_cors()) +} + pub fn get_data< D: KvStoreConnection + Clone + Send + 'static, C: KvStoreConnection + Clone + Send + 'static, - T: Serialize + DeserializeOwned, >( db: Arc>, cache: Arc>, @@ -43,7 +66,7 @@ pub fn get_data< .and_then(move |_, headers, cache, db, cf| { // Add type annotation for headers parameter debug!("GET_DATA requested"); - map_api_res(get_data_handler(headers, db, cache, cf)) + map_api_res(get_data_handler(headers, None, db, cache, cf)) }) .with(get_cors()) } diff --git a/src/api/utils.rs b/src/api/utils.rs index f86b3c6..85636ec 100644 --- a/src/api/utils.rs +++ b/src/api/utils.rs @@ -1,5 +1,6 @@ use futures::lock::Mutex; -use serde_json::Value; +use serde_json::{json, Value}; +use std::collections::HashMap; use std::sync::Arc; use tracing::info; use valence_core::api::errors::ApiErrorType; @@ -15,17 +16,21 @@ use valence_core::db::handler::KvStoreConnection; pub async fn retrieve_from_db( db: Arc>, address: &str, + value_id: Option<&str>, ) -> Result { let r = CallResponse::new("get_data"); info!("RETRIEVE_FROM_DB requested with address: {:?}", address); - let db_result: Result>, _> = db.lock().await.get_data(&address).await; + let db_result: Result>, _> = + db.lock().await.get_data(&address, value_id).await; + let value_id = value_id.unwrap().to_string(); match db_result { Ok(data) => match data { Some(value) => { info!("Data retrieved from DB"); let data = value + .get(&value_id) .iter() .map(|v| serde_json::from_str(v).unwrap()) .collect::>(); @@ -41,3 +46,20 @@ pub async fn retrieve_from_db( )), } } + +pub fn serialize_all_entries(data: HashMap) -> HashMap { + let mut output = HashMap::new(); + + for (key, value) in data { + match serde_json::from_str(&value) { + Ok(json_value) => { + output.insert(key, json_value); + } + Err(e) => { + output.insert(key, json!(value)); + } + } + } + + output +} diff --git a/src/constants.rs b/src/constants.rs index 9a1bb97..270fe7e 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -26,3 +26,4 @@ pub const DRUID_LENGTH: usize = 16; pub const DB_KEY: &str = "default"; pub const CUCKOO_FILTER_KEY: &str = "cuckoo_filter"; +pub const CUCKOO_FILTER_VALUE_ID: &str = "cuckoo_filter_id"; diff --git a/src/interfaces.rs b/src/interfaces.rs index 85c19bd..020db78 100644 --- a/src/interfaces.rs +++ b/src/interfaces.rs @@ -11,6 +11,13 @@ pub struct GetRequestData { pub struct SetRequestData { pub address: String, pub data: Value, + pub data_id: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SetSaveData { + pub address: String, + pub data: Value, } pub struct EnvConfig { diff --git a/src/main.rs b/src/main.rs index c034175..56609c7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,12 +13,9 @@ use crate::utils::{ }; use futures::lock::Mutex; -use serde_json::Value; use std::sync::Arc; use tracing::info; use valence_core::api::utils::handle_rejection; -use valence_core::db::mongo_db::MongoDbConn; -use valence_core::db::redis_cache::RedisCacheConn; use warp::Filter; @@ -48,37 +45,20 @@ async fn main() { info!("Cuckoo filter initialized successfully"); - let routes = get_data::( - db_conn.clone(), - cache_conn.clone(), - cuckoo_filter.clone(), - ) - .or(set_data( - db_conn.clone(), - cache_conn.clone(), - cuckoo_filter.clone(), - config.body_limit, - config.cache_ttl, - )) - // .or(listings(market_db_conn.clone(), cache_conn.clone())) - // .or(orders_by_id( - // market_db_conn.clone(), - // cache_conn.clone(), - // cuckoo_filter.clone(), - // )) - // .or(orders_send( - // market_db_conn.clone(), - // cache_conn.clone(), - // cuckoo_filter.clone(), - // config.body_limit, - // )) - // .or(listing_send( - // market_db_conn.clone(), - // cache_conn.clone(), - // cuckoo_filter.clone(), - // config.body_limit, - // )) - .recover(handle_rejection); + let routes = get_data(db_conn.clone(), cache_conn.clone(), cuckoo_filter.clone()) + .or(get_data_with_id( + db_conn.clone(), + cache_conn.clone(), + cuckoo_filter.clone(), + )) + .or(set_data( + db_conn.clone(), + cache_conn.clone(), + cuckoo_filter.clone(), + config.body_limit, + config.cache_ttl, + )) + .recover(handle_rejection); print_welcome(&db_addr, &cache_addr); diff --git a/src/tests/interfaces.rs b/src/tests/interfaces.rs index 60b74ee..84dc8de 100644 --- a/src/tests/interfaces.rs +++ b/src/tests/interfaces.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use async_trait::async_trait; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use valence_core::db::handler::{CacheHandler, KvStoreConnection}; @@ -39,7 +41,8 @@ impl KvStoreConnection for DbStub { async fn get_data( &mut self, _key: &str, - ) -> Result>, Box> { + value_id: Option<&str>, + ) -> Result>, Box> { if self.data.is_none() { return Ok(None); } @@ -49,15 +52,16 @@ impl KvStoreConnection for DbStub { None => return Ok(None), }; - match get_de_data::>(data) { + match get_de_data::>(data) { Ok(d) => Ok(Some(d)), Err(_) => Ok(None), } } - async fn delete_data( + async fn del_data( &mut self, _key: &str, + value_id: Option<&str>, ) -> Result<(), Box> { self.data = None; @@ -67,6 +71,7 @@ impl KvStoreConnection for DbStub { async fn set_data_with_expiry( &mut self, _key: &str, + value_id: &str, value: T, _seconds: usize, ) -> Result<(), Box> { @@ -78,6 +83,7 @@ impl KvStoreConnection for DbStub { async fn set_data( &mut self, _key: &str, + value_id: &str, value: T, ) -> Result<(), Box> { self.data = Some(serialize_data(&value)); diff --git a/src/tests/mod.rs b/src/tests/mod.rs index eb19702..45aa678 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -32,8 +32,7 @@ async fn test_get_data_empty() { // // Act // - let filter = routes::get_data::(db_stub, cache_stub, cfilter) - .recover(handle_rejection); + let filter = routes::get_data(db_stub, cache_stub, cfilter).recover(handle_rejection); let res = request.reply(&filter).await; // @@ -67,13 +66,13 @@ async fn test_get_data() { db_stub .lock() .await - .set_data(TEST_VALID_ADDRESS, test_value.clone()) + .set_data(TEST_VALID_ADDRESS, "blah", test_value.clone()) .await .unwrap(); cache_stub .lock() .await - .set_data(TEST_VALID_ADDRESS, test_value) + .set_data(TEST_VALID_ADDRESS, "blah", test_value) .await .unwrap(); cfilter.lock().await.add(TEST_VALID_ADDRESS).unwrap(); @@ -81,8 +80,7 @@ async fn test_get_data() { // // Act // - let filter = routes::get_data::(db_stub, cache_stub, cfilter) - .recover(handle_rejection); + let filter = routes::get_data(db_stub, cache_stub, cfilter).recover(handle_rejection); let res = request.reply(&filter).await; // diff --git a/src/utils.rs b/src/utils.rs index 18d6c40..3b616ba 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,8 +1,8 @@ use crate::constants::{ - CONFIG_FILE, CUCKOO_FILTER_KEY, DRUID_CHARSET, DRUID_LENGTH, SETTINGS_BODY_LIMIT, - SETTINGS_CACHE_PASSWORD, SETTINGS_CACHE_PORT, SETTINGS_CACHE_TTL, SETTINGS_CACHE_URL, - SETTINGS_DB_PASSWORD, SETTINGS_DB_PORT, SETTINGS_DB_PROTOCOL, SETTINGS_DB_URL, SETTINGS_DEBUG, - SETTINGS_EXTERN_PORT, + CONFIG_FILE, CUCKOO_FILTER_KEY, CUCKOO_FILTER_VALUE_ID, DRUID_CHARSET, DRUID_LENGTH, + SETTINGS_BODY_LIMIT, SETTINGS_CACHE_PASSWORD, SETTINGS_CACHE_PORT, SETTINGS_CACHE_TTL, + SETTINGS_CACHE_URL, SETTINGS_DB_PASSWORD, SETTINGS_DB_PORT, SETTINGS_DB_PROTOCOL, + SETTINGS_DB_URL, SETTINGS_DEBUG, SETTINGS_EXTERN_PORT, }; use crate::interfaces::EnvConfig; use chrono::prelude::*; @@ -20,7 +20,7 @@ use valence_core::db::redis_cache::RedisCacheConn; // ========== STORAGE SERIALIZATION FOR CUCKOO FILTER ========== // /// Serializable struct for cuckoo filter -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug, Clone)] struct StorageReadyCuckooFilter { values: Vec, length: usize, @@ -91,10 +91,17 @@ pub async fn save_cuckoo_filter_to_disk( let mut db_lock = db.lock().await; match db_lock - .set_data(CUCKOO_FILTER_KEY, serializable_cuckoo) + .set_data( + CUCKOO_FILTER_KEY, + CUCKOO_FILTER_VALUE_ID, + serializable_cuckoo, + ) .await { - Ok(_) => Ok(()), + Ok(_) => { + info!("Cuckoo filter saved to disk successfully"); + Ok(()) + } Err(e) => Err(format!( "Failed to save cuckoo filter to disk with error: {}", e @@ -112,21 +119,18 @@ pub async fn load_cuckoo_filter_from_disk( ) -> Result, String> { let mut db_lock = db.lock().await; - match db_lock.get_data(CUCKOO_FILTER_KEY).await { + match db_lock + .get_data::(CUCKOO_FILTER_KEY, Some(CUCKOO_FILTER_VALUE_ID)) + .await + { Ok(data) => match data { Some(data) => { - let cuckoo_filter: StorageReadyCuckooFilter = match serde_json::from_slice(&data) { - Ok(cf) => cf, - Err(e) => { - return Err(format!( - "Failed to deserialize cuckoo filter from disk with error: {}", - e - )) - } - }; + let cf: StorageReadyCuckooFilter = + data.get(CUCKOO_FILTER_VALUE_ID).unwrap().clone(); + info!("Found existing cuckoo filter. Loaded from disk successfully"); - let cfe: ExportedCuckooFilter = cuckoo_filter.into(); - let cf = CuckooFilter::from(cfe); + let cfe: ExportedCuckooFilter = cf.into(); + let cf: CuckooFilter = CuckooFilter::from(cfe); Ok(cf) }