Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changing DB entry structure #9

Merged
merged 1 commit into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ tracing = "0.1.37"
tracing-subscriber = "0.3.17"
tracing-futures = "0.2.3"
warp = "0.3.5"
valence_core = "0.1.5"
valence_core = "0.1.6"
valence_market = "0.1.0"
56 changes: 43 additions & 13 deletions src/api/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::api::utils::retrieve_from_db;
use crate::interfaces::SetRequestData;
use crate::api::utils::{retrieve_from_db, serialize_all_entries};
use crate::interfaces::{SetRequestData, SetSaveData};
use futures::lock::Mutex;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{error, info, warn};
use valence_core::api::errors::ApiErrorType;
Expand All @@ -25,6 +26,7 @@ pub async fn get_data_handler<
C: KvStoreConnection + Clone + Send + 'static,
>(
headers: warp::hyper::HeaderMap,
value_id: Option<String>,
db: Arc<Mutex<D>>,
cache: Arc<Mutex<C>>,
c_filter: CFilterConnection,
Expand All @@ -45,8 +47,9 @@ pub async fn get_data_handler<

// Check cache first
let mut cache_lock_result = cache.lock().await;
let cache_result: Result<Option<Vec<String>>, _> =
cache_lock_result.get_data::<String>(address).await;
let cache_result: Result<Option<HashMap<String, String>>, _> = cache_lock_result
.get_data::<String>(address, value_id.as_deref())
.await;

info!("Cache result: {:?}", cache_result);

Expand All @@ -55,16 +58,32 @@ pub async fn get_data_handler<
match value {
Some(value) => {
info!("Data retrieved from cache");
let data = value
.iter()
.map(|v| serde_json::from_str(v).unwrap())
.collect::<Vec<Value>>();
return r.into_ok("Data retrieved successfully", json_serialize_embed(data));
if let Some(id) = value_id {
if !value.contains_key(&id) {
return r.into_err_internal(ApiErrorType::Generic(
"Value ID not found".to_string(),
));
}

let data = value.get(&id).unwrap().clone();
let final_result: Value = serde_json::from_str(&data).unwrap();
return r.into_ok(
"Data retrieved successfully",
json_serialize_embed(final_result),
);
}

let final_value = serialize_all_entries(value);

return r.into_ok(
"Data retrieved successfully",
json_serialize_embed(final_value),
);
}
None => {
// Default to checking from DB if cache is empty
warn!("Cache lookup failed for address: {}", address);
retrieve_from_db(db, address).await
retrieve_from_db(db, address, value_id.as_deref()).await
}
}
}
Expand All @@ -73,7 +92,7 @@ pub async fn get_data_handler<
warn!("Attempting to retrieve data from DB");

// Get data from DB
retrieve_from_db(db, address).await
retrieve_from_db(db, address, value_id.as_deref()).await
}
}
}
Expand All @@ -99,11 +118,22 @@ pub async fn set_data_handler<
let r = CallResponse::new("set_data");
info!("SET_DATA requested with payload: {:?}", payload);

let data_to_save: SetSaveData = {
SetSaveData {
address: payload.address.clone(),
data: payload.data.clone(),
}
};

// Add to cache
let cache_result = cache
.lock()
.await
.set_data(&payload.address.clone(), serialize_data(&payload.data))
.set_data(
&payload.address.clone(),
&payload.data_id,
serialize_data(&data_to_save),
)
.await;

// Add to DB
Expand All @@ -118,7 +148,7 @@ pub async fn set_data_handler<

db.lock()
.await
.set_data(&payload.address, payload.data)
.set_data(&payload.address, &payload.data_id, data_to_save)
.await
}
Err(_) => {
Expand Down
35 changes: 29 additions & 6 deletions src/api/routes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::api::handlers::{get_data_handler, set_data_handler};
use futures::lock::Mutex;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::sync::Arc;
use tracing::debug;
use valence_core::api::interfaces::CFilterConnection;
Expand All @@ -13,19 +11,44 @@ use warp::{Filter, Rejection, Reply};

// ========== BASE ROUTES ========== //

/// GET /get_data
/// GET /get_data_with_id
///
/// Retrieves data associated with a given address
/// Retrieves data associated with a given address and a given id
///
/// ### Arguments
///
/// * `db` - The database connection to use
/// * `cache` - The cache connection to use
/// * `cuckoo_filter` - The cuckoo filter connection to use
pub fn get_data_with_id<
D: KvStoreConnection + Clone + Send + 'static,
C: KvStoreConnection + Clone + Send + 'static,
>(
db: Arc<Mutex<D>>,
cache: Arc<Mutex<C>>,
cuckoo_filter: CFilterConnection,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
debug!("Setting up get_data_with_id route");

warp::path("get_data")
.and(warp::get())
.and(sig_verify_middleware())
.and(warp::header::headers_cloned())
.and(warp::path::param::<String>())
.and(with_node_component(cache))
.and(with_node_component(db))
.and(with_node_component(cuckoo_filter))
.and_then(move |_, headers, value_id: String, cache, db, cf| {
// Add type annotation for headers parameter
debug!("GET_DATA requested");
map_api_res(get_data_handler(headers, Some(value_id), db, cache, cf))
})
.with(get_cors())
}

pub fn get_data<
D: KvStoreConnection + Clone + Send + 'static,
C: KvStoreConnection + Clone + Send + 'static,
T: Serialize + DeserializeOwned,
>(
db: Arc<Mutex<D>>,
cache: Arc<Mutex<C>>,
Expand All @@ -43,7 +66,7 @@ pub fn get_data<
.and_then(move |_, headers, cache, db, cf| {
// Add type annotation for headers parameter
debug!("GET_DATA requested");
map_api_res(get_data_handler(headers, db, cache, cf))
map_api_res(get_data_handler(headers, None, db, cache, cf))
})
.with(get_cors())
}
Expand Down
26 changes: 24 additions & 2 deletions src/api/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures::lock::Mutex;
use serde_json::Value;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::info;
use valence_core::api::errors::ApiErrorType;
Expand All @@ -15,17 +16,21 @@ use valence_core::db::handler::KvStoreConnection;
pub async fn retrieve_from_db<D: KvStoreConnection + Clone + Send + 'static>(
db: Arc<Mutex<D>>,
address: &str,
value_id: Option<&str>,
) -> Result<JsonReply, JsonReply> {
let r = CallResponse::new("get_data");
info!("RETRIEVE_FROM_DB requested with address: {:?}", address);

let db_result: Result<Option<Vec<String>>, _> = db.lock().await.get_data(&address).await;
let db_result: Result<Option<HashMap<String, String>>, _> =
db.lock().await.get_data(&address, value_id).await;
let value_id = value_id.unwrap().to_string();

match db_result {
Ok(data) => match data {
Some(value) => {
info!("Data retrieved from DB");
let data = value
.get(&value_id)
.iter()
.map(|v| serde_json::from_str(v).unwrap())
.collect::<Vec<Value>>();
Expand All @@ -41,3 +46,20 @@ pub async fn retrieve_from_db<D: KvStoreConnection + Clone + Send + 'static>(
)),
}
}

pub fn serialize_all_entries(data: HashMap<String, String>) -> HashMap<String, Value> {
let mut output = HashMap::new();

for (key, value) in data {
match serde_json::from_str(&value) {
Ok(json_value) => {
output.insert(key, json_value);
}
Err(e) => {
output.insert(key, json!(value));
}
}
}

output
}
1 change: 1 addition & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ pub const DRUID_LENGTH: usize = 16;

pub const DB_KEY: &str = "default";
pub const CUCKOO_FILTER_KEY: &str = "cuckoo_filter";
pub const CUCKOO_FILTER_VALUE_ID: &str = "cuckoo_filter_id";
7 changes: 7 additions & 0 deletions src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ pub struct GetRequestData {
pub struct SetRequestData {
pub address: String,
pub data: Value,
pub data_id: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SetSaveData {
pub address: String,
pub data: Value,
}

pub struct EnvConfig {
Expand Down
48 changes: 14 additions & 34 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@ use crate::utils::{
};

use futures::lock::Mutex;
use serde_json::Value;
use std::sync::Arc;
use tracing::info;
use valence_core::api::utils::handle_rejection;
use valence_core::db::mongo_db::MongoDbConn;
use valence_core::db::redis_cache::RedisCacheConn;

use warp::Filter;

Expand Down Expand Up @@ -48,37 +45,20 @@ async fn main() {

info!("Cuckoo filter initialized successfully");

let routes = get_data::<MongoDbConn, RedisCacheConn, Value>(
db_conn.clone(),
cache_conn.clone(),
cuckoo_filter.clone(),
)
.or(set_data(
db_conn.clone(),
cache_conn.clone(),
cuckoo_filter.clone(),
config.body_limit,
config.cache_ttl,
))
// .or(listings(market_db_conn.clone(), cache_conn.clone()))
// .or(orders_by_id(
// market_db_conn.clone(),
// cache_conn.clone(),
// cuckoo_filter.clone(),
// ))
// .or(orders_send(
// market_db_conn.clone(),
// cache_conn.clone(),
// cuckoo_filter.clone(),
// config.body_limit,
// ))
// .or(listing_send(
// market_db_conn.clone(),
// cache_conn.clone(),
// cuckoo_filter.clone(),
// config.body_limit,
// ))
.recover(handle_rejection);
let routes = get_data(db_conn.clone(), cache_conn.clone(), cuckoo_filter.clone())
.or(get_data_with_id(
db_conn.clone(),
cache_conn.clone(),
cuckoo_filter.clone(),
))
.or(set_data(
db_conn.clone(),
cache_conn.clone(),
cuckoo_filter.clone(),
config.body_limit,
config.cache_ttl,
))
.recover(handle_rejection);

print_welcome(&db_addr, &cache_addr);

Expand Down
12 changes: 9 additions & 3 deletions src/tests/interfaces.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use async_trait::async_trait;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use valence_core::db::handler::{CacheHandler, KvStoreConnection};
Expand Down Expand Up @@ -39,7 +41,8 @@ impl KvStoreConnection for DbStub {
async fn get_data<T: DeserializeOwned>(
&mut self,
_key: &str,
) -> Result<Option<Vec<T>>, Box<dyn std::error::Error + Send + Sync>> {
value_id: Option<&str>,
) -> Result<Option<HashMap<String, T>>, Box<dyn std::error::Error + Send + Sync>> {
if self.data.is_none() {
return Ok(None);
}
Expand All @@ -49,15 +52,16 @@ impl KvStoreConnection for DbStub {
None => return Ok(None),
};

match get_de_data::<Vec<T>>(data) {
match get_de_data::<HashMap<String, T>>(data) {
Ok(d) => Ok(Some(d)),
Err(_) => Ok(None),
}
}

async fn delete_data(
async fn del_data(
&mut self,
_key: &str,
value_id: Option<&str>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.data = None;

Expand All @@ -67,6 +71,7 @@ impl KvStoreConnection for DbStub {
async fn set_data_with_expiry<T: Serialize + Send>(
&mut self,
_key: &str,
value_id: &str,
value: T,
_seconds: usize,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Expand All @@ -78,6 +83,7 @@ impl KvStoreConnection for DbStub {
async fn set_data<T: Serialize + Send>(
&mut self,
_key: &str,
value_id: &str,
value: T,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.data = Some(serialize_data(&value));
Expand Down
Loading
Loading