Skip to content

Commit

Permalink
Merge pull request #9 from AIBlockOfficial/bugfix_json_value
Browse files Browse the repository at this point in the history
Changing DB entry structure
  • Loading branch information
BHouwens authored Jun 26, 2024
2 parents ff92708 + c4025b0 commit 2723c4f
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 84 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ tracing = "0.1.37"
tracing-subscriber = "0.3.17"
tracing-futures = "0.2.3"
warp = "0.3.5"
valence_core = "0.1.5"
valence_core = "0.1.6"
valence_market = "0.1.0"
56 changes: 43 additions & 13 deletions src/api/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::api::utils::retrieve_from_db;
use crate::interfaces::SetRequestData;
use crate::api::utils::{retrieve_from_db, serialize_all_entries};
use crate::interfaces::{SetRequestData, SetSaveData};
use futures::lock::Mutex;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{error, info, warn};
use valence_core::api::errors::ApiErrorType;
Expand All @@ -25,6 +26,7 @@ pub async fn get_data_handler<
C: KvStoreConnection + Clone + Send + 'static,
>(
headers: warp::hyper::HeaderMap,
value_id: Option<String>,
db: Arc<Mutex<D>>,
cache: Arc<Mutex<C>>,
c_filter: CFilterConnection,
Expand All @@ -45,8 +47,9 @@ pub async fn get_data_handler<

// Check cache first
let mut cache_lock_result = cache.lock().await;
let cache_result: Result<Option<Vec<String>>, _> =
cache_lock_result.get_data::<String>(address).await;
let cache_result: Result<Option<HashMap<String, String>>, _> = cache_lock_result
.get_data::<String>(address, value_id.as_deref())
.await;

info!("Cache result: {:?}", cache_result);

Expand All @@ -55,16 +58,32 @@ pub async fn get_data_handler<
match value {
Some(value) => {
info!("Data retrieved from cache");
let data = value
.iter()
.map(|v| serde_json::from_str(v).unwrap())
.collect::<Vec<Value>>();
return r.into_ok("Data retrieved successfully", json_serialize_embed(data));
if let Some(id) = value_id {
if !value.contains_key(&id) {
return r.into_err_internal(ApiErrorType::Generic(
"Value ID not found".to_string(),
));
}

let data = value.get(&id).unwrap().clone();
let final_result: Value = serde_json::from_str(&data).unwrap();
return r.into_ok(
"Data retrieved successfully",
json_serialize_embed(final_result),
);
}

let final_value = serialize_all_entries(value);

return r.into_ok(
"Data retrieved successfully",
json_serialize_embed(final_value),
);
}
None => {
// Default to checking from DB if cache is empty
warn!("Cache lookup failed for address: {}", address);
retrieve_from_db(db, address).await
retrieve_from_db(db, address, value_id.as_deref()).await
}
}
}
Expand All @@ -73,7 +92,7 @@ pub async fn get_data_handler<
warn!("Attempting to retrieve data from DB");

// Get data from DB
retrieve_from_db(db, address).await
retrieve_from_db(db, address, value_id.as_deref()).await
}
}
}
Expand All @@ -99,11 +118,22 @@ pub async fn set_data_handler<
let r = CallResponse::new("set_data");
info!("SET_DATA requested with payload: {:?}", payload);

let data_to_save: SetSaveData = {
SetSaveData {
address: payload.address.clone(),
data: payload.data.clone(),
}
};

// Add to cache
let cache_result = cache
.lock()
.await
.set_data(&payload.address.clone(), serialize_data(&payload.data))
.set_data(
&payload.address.clone(),
&payload.data_id,
serialize_data(&data_to_save),
)
.await;

// Add to DB
Expand All @@ -118,7 +148,7 @@ pub async fn set_data_handler<

db.lock()
.await
.set_data(&payload.address, payload.data)
.set_data(&payload.address, &payload.data_id, data_to_save)
.await
}
Err(_) => {
Expand Down
35 changes: 29 additions & 6 deletions src/api/routes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::api::handlers::{get_data_handler, set_data_handler};
use futures::lock::Mutex;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::sync::Arc;
use tracing::debug;
use valence_core::api::interfaces::CFilterConnection;
Expand All @@ -13,19 +11,44 @@ use warp::{Filter, Rejection, Reply};

// ========== BASE ROUTES ========== //

/// GET /get_data
/// GET /get_data_with_id
///
/// Retrieves data associated with a given address
/// Retrieves data associated with a given address and a given id
///
/// ### Arguments
///
/// * `db` - The database connection to use
/// * `cache` - The cache connection to use
/// * `cuckoo_filter` - The cuckoo filter connection to use
pub fn get_data_with_id<
D: KvStoreConnection + Clone + Send + 'static,
C: KvStoreConnection + Clone + Send + 'static,
>(
db: Arc<Mutex<D>>,
cache: Arc<Mutex<C>>,
cuckoo_filter: CFilterConnection,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
debug!("Setting up get_data_with_id route");

warp::path("get_data")
.and(warp::get())
.and(sig_verify_middleware())
.and(warp::header::headers_cloned())
.and(warp::path::param::<String>())
.and(with_node_component(cache))
.and(with_node_component(db))
.and(with_node_component(cuckoo_filter))
.and_then(move |_, headers, value_id: String, cache, db, cf| {
// Add type annotation for headers parameter
debug!("GET_DATA requested");
map_api_res(get_data_handler(headers, Some(value_id), db, cache, cf))
})
.with(get_cors())
}

pub fn get_data<
D: KvStoreConnection + Clone + Send + 'static,
C: KvStoreConnection + Clone + Send + 'static,
T: Serialize + DeserializeOwned,
>(
db: Arc<Mutex<D>>,
cache: Arc<Mutex<C>>,
Expand All @@ -43,7 +66,7 @@ pub fn get_data<
.and_then(move |_, headers, cache, db, cf| {
// Add type annotation for headers parameter
debug!("GET_DATA requested");
map_api_res(get_data_handler(headers, db, cache, cf))
map_api_res(get_data_handler(headers, None, db, cache, cf))
})
.with(get_cors())
}
Expand Down
26 changes: 24 additions & 2 deletions src/api/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures::lock::Mutex;
use serde_json::Value;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::info;
use valence_core::api::errors::ApiErrorType;
Expand All @@ -15,17 +16,21 @@ use valence_core::db::handler::KvStoreConnection;
pub async fn retrieve_from_db<D: KvStoreConnection + Clone + Send + 'static>(
db: Arc<Mutex<D>>,
address: &str,
value_id: Option<&str>,
) -> Result<JsonReply, JsonReply> {
let r = CallResponse::new("get_data");
info!("RETRIEVE_FROM_DB requested with address: {:?}", address);

let db_result: Result<Option<Vec<String>>, _> = db.lock().await.get_data(&address).await;
let db_result: Result<Option<HashMap<String, String>>, _> =
db.lock().await.get_data(&address, value_id).await;
let value_id = value_id.unwrap().to_string();

match db_result {
Ok(data) => match data {
Some(value) => {
info!("Data retrieved from DB");
let data = value
.get(&value_id)
.iter()
.map(|v| serde_json::from_str(v).unwrap())
.collect::<Vec<Value>>();
Expand All @@ -41,3 +46,20 @@ pub async fn retrieve_from_db<D: KvStoreConnection + Clone + Send + 'static>(
)),
}
}

pub fn serialize_all_entries(data: HashMap<String, String>) -> HashMap<String, Value> {
let mut output = HashMap::new();

for (key, value) in data {
match serde_json::from_str(&value) {
Ok(json_value) => {
output.insert(key, json_value);
}
Err(e) => {
output.insert(key, json!(value));
}
}
}

output
}
1 change: 1 addition & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ pub const DRUID_LENGTH: usize = 16;
pub const DB_KEY: &str = "default";
pub const CUCKOO_FILTER_KEY: &str = "cuckoo_filter";
pub const CUCKOO_FILTER_VALUE_ID: &str = "cuckoo_filter_id";
7 changes: 7 additions & 0 deletions src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ pub struct GetRequestData {
pub struct SetRequestData {
pub address: String,
pub data: Value,
pub data_id: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SetSaveData {
pub address: String,
pub data: Value,
}

pub struct EnvConfig {
Expand Down
48 changes: 14 additions & 34 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@ use crate::utils::{
};

use futures::lock::Mutex;
use serde_json::Value;
use std::sync::Arc;
use tracing::info;
use valence_core::api::utils::handle_rejection;
use valence_core::db::mongo_db::MongoDbConn;
use valence_core::db::redis_cache::RedisCacheConn;

use warp::Filter;

Expand Down Expand Up @@ -48,37 +45,20 @@ async fn main() {

info!("Cuckoo filter initialized successfully");

let routes = get_data::<MongoDbConn, RedisCacheConn, Value>(
db_conn.clone(),
cache_conn.clone(),
cuckoo_filter.clone(),
)
.or(set_data(
db_conn.clone(),
cache_conn.clone(),
cuckoo_filter.clone(),
config.body_limit,
config.cache_ttl,
))
// .or(listings(market_db_conn.clone(), cache_conn.clone()))
// .or(orders_by_id(
// market_db_conn.clone(),
// cache_conn.clone(),
// cuckoo_filter.clone(),
// ))
// .or(orders_send(
// market_db_conn.clone(),
// cache_conn.clone(),
// cuckoo_filter.clone(),
// config.body_limit,
// ))
// .or(listing_send(
// market_db_conn.clone(),
// cache_conn.clone(),
// cuckoo_filter.clone(),
// config.body_limit,
// ))
.recover(handle_rejection);
let routes = get_data(db_conn.clone(), cache_conn.clone(), cuckoo_filter.clone())
.or(get_data_with_id(
db_conn.clone(),
cache_conn.clone(),
cuckoo_filter.clone(),
))
.or(set_data(
db_conn.clone(),
cache_conn.clone(),
cuckoo_filter.clone(),
config.body_limit,
config.cache_ttl,
))
.recover(handle_rejection);

print_welcome(&db_addr, &cache_addr);

Expand Down
12 changes: 9 additions & 3 deletions src/tests/interfaces.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use async_trait::async_trait;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use valence_core::db::handler::{CacheHandler, KvStoreConnection};
Expand Down Expand Up @@ -39,7 +41,8 @@ impl KvStoreConnection for DbStub {
async fn get_data<T: DeserializeOwned>(
&mut self,
_key: &str,
) -> Result<Option<Vec<T>>, Box<dyn std::error::Error + Send + Sync>> {
value_id: Option<&str>,
) -> Result<Option<HashMap<String, T>>, Box<dyn std::error::Error + Send + Sync>> {
if self.data.is_none() {
return Ok(None);
}
Expand All @@ -49,15 +52,16 @@ impl KvStoreConnection for DbStub {
None => return Ok(None),
};

match get_de_data::<Vec<T>>(data) {
match get_de_data::<HashMap<String, T>>(data) {
Ok(d) => Ok(Some(d)),
Err(_) => Ok(None),
}
}

async fn delete_data(
async fn del_data(
&mut self,
_key: &str,
value_id: Option<&str>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.data = None;

Expand All @@ -67,6 +71,7 @@ impl KvStoreConnection for DbStub {
async fn set_data_with_expiry<T: Serialize + Send>(
&mut self,
_key: &str,
value_id: &str,
value: T,
_seconds: usize,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Expand All @@ -78,6 +83,7 @@ impl KvStoreConnection for DbStub {
async fn set_data<T: Serialize + Send>(
&mut self,
_key: &str,
value_id: &str,
value: T,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.data = Some(serialize_data(&value));
Expand Down
Loading

0 comments on commit 2723c4f

Please sign in to comment.