From af489f75fc62a8b46b7ec6ca75c2365fbd5cb830 Mon Sep 17 00:00:00 2001 From: RobinP122 Date: Thu, 27 Jun 2024 11:49:08 +0200 Subject: [PATCH 1/5] Moved db crate from valence_core to Valence app. This simplifies the developement and debugging process. --- src/api/handlers.rs | 2 +- src/api/routes.rs | 2 +- src/api/utils.rs | 2 +- src/db/handler.rs | 85 ++++++++++++ src/db/mod.rs | 3 + src/db/mongo_db.rs | 301 ++++++++++++++++++++++++++++++++++++++++ src/db/redis_cache.rs | 153 ++++++++++++++++++++ src/main.rs | 1 + src/tests/interfaces.rs | 2 +- src/tests/mod.rs | 3 +- src/utils.rs | 6 +- 11 files changed, 551 insertions(+), 9 deletions(-) create mode 100644 src/db/handler.rs create mode 100644 src/db/mod.rs create mode 100644 src/db/mongo_db.rs create mode 100644 src/db/redis_cache.rs diff --git a/src/api/handlers.rs b/src/api/handlers.rs index 35c982e..aafa4b2 100644 --- a/src/api/handlers.rs +++ b/src/api/handlers.rs @@ -1,5 +1,6 @@ use crate::api::utils::{retrieve_from_db, serialize_all_entries}; use crate::interfaces::{SetRequestData, SetSaveData}; +use crate::db::handler::{CacheHandler, KvStoreConnection}; use futures::lock::Mutex; use serde_json::Value; use std::collections::HashMap; @@ -8,7 +9,6 @@ use tracing::{error, info, warn}; use valence_core::api::errors::ApiErrorType; use valence_core::api::interfaces::CFilterConnection; use valence_core::api::responses::{json_serialize_embed, CallResponse, JsonReply}; -use valence_core::db::handler::{CacheHandler, KvStoreConnection}; use valence_core::utils::serialize_data; // ========= BASE HANDLERS ========= // diff --git a/src/api/routes.rs b/src/api/routes.rs index d2df43d..73fb8fd 100644 --- a/src/api/routes.rs +++ b/src/api/routes.rs @@ -1,4 +1,5 @@ use crate::api::handlers::{get_data_handler, set_data_handler}; +use crate::db::handler::{CacheHandler, KvStoreConnection}; use futures::lock::Mutex; use std::sync::Arc; use tracing::debug; @@ -6,7 +7,6 @@ use valence_core::api::interfaces::CFilterConnection; use valence_core::api::utils::{ get_cors, map_api_res, post_cors, sig_verify_middleware, with_node_component, }; -use valence_core::db::handler::{CacheHandler, KvStoreConnection}; use warp::{Filter, Rejection, Reply}; // ========== BASE ROUTES ========== // diff --git a/src/api/utils.rs b/src/api/utils.rs index 85636ec..8c1123c 100644 --- a/src/api/utils.rs +++ b/src/api/utils.rs @@ -1,3 +1,4 @@ +use crate::db::handler::KvStoreConnection; use futures::lock::Mutex; use serde_json::{json, Value}; use std::collections::HashMap; @@ -5,7 +6,6 @@ use std::sync::Arc; use tracing::info; use valence_core::api::errors::ApiErrorType; use valence_core::api::responses::{json_serialize_embed, CallResponse, JsonReply}; -use valence_core::db::handler::KvStoreConnection; /// Retrieve data from the database /// diff --git a/src/db/handler.rs b/src/db/handler.rs new file mode 100644 index 0000000..41703dc --- /dev/null +++ b/src/db/handler.rs @@ -0,0 +1,85 @@ +use async_trait::async_trait; +use serde::{de::DeserializeOwned, Serialize}; +use std::collections::HashMap; + +/// Trait for a key-value data store connection +#[async_trait] +pub trait KvStoreConnection { + /// Initialize a connection to the cache + /// + /// ### Arguments + /// + /// * `url` - A string slice that holds the URL to connect to + async fn init(url: &str) -> Result> + where + Self: Sized; + + /// Sets a data entry in the cache + /// + /// ### Arguments + /// + /// * `key` - Key of the data entry to set + /// * `value_id` - ID of the value to set + /// * `value` - Value of the data entry to set + async fn set_data( + &mut self, + key: &str, + value_id: &str, + value: T, + ) -> Result<(), Box>; + + /// Sets a data entry in the cache with an expiration time + /// + /// ### Arguments + /// + /// * `key` - Key of the data entry to set + /// * `value_id` - ID of the value to set + /// * `value` - Value of the data entry to set + /// * `seconds` - Number of seconds to expire the data entry in + async fn set_data_with_expiry( + &mut self, + key: &str, + value_id: &str, + value: T, + seconds: usize, + ) -> Result<(), Box>; + + /// Deletes a data entry from the cache + /// + /// ### Arguments + /// + /// * `key` - Key of the data entry to delete + /// * `value_id` - ID of the value to delete. If not provided, all values for the key are deleted + async fn del_data( + &mut self, + key: &str, + value_id: Option<&str>, + ) -> Result<(), Box>; + + /// Gets a data entry from the cache + /// + /// ### Arguments + /// + /// * `key` - Key of the data entry to get + /// * `value_id` - ID of the value to get. If not provided, all values for the key are retrieved + async fn get_data( + &mut self, + key: &str, + value_id: Option<&str>, + ) -> Result>, Box>; +} + +#[async_trait] +pub trait CacheHandler { + /// Sets an expiration time for a data entry + /// + /// ### Arguments + /// + /// * `key` - Key of the data entry to expire + /// * `seconds` - Number of seconds to expire the data entry in + async fn expire_entry( + &mut self, + key: &str, + seconds: usize, + ) -> Result<(), Box>; +} diff --git a/src/db/mod.rs b/src/db/mod.rs new file mode 100644 index 0000000..689c642 --- /dev/null +++ b/src/db/mod.rs @@ -0,0 +1,3 @@ +pub mod handler; +pub mod mongo_db; +pub mod redis_cache; diff --git a/src/db/mongo_db.rs b/src/db/mongo_db.rs new file mode 100644 index 0000000..6de9ca6 --- /dev/null +++ b/src/db/mongo_db.rs @@ -0,0 +1,301 @@ +use async_trait::async_trait; +use mongodb::bson::{doc, DateTime, Document}; +use mongodb::{options::ClientOptions, Client}; +use serde::{de::DeserializeOwned, Serialize}; +use std::collections::HashMap; +use tracing::{event, span, trace, warn, Level}; + +use super::handler::KvStoreConnection; + +#[derive(Debug, Clone)] +pub struct MongoDbIndex { + pub db_name: String, + pub coll_name: String, +} + +#[derive(Debug, Clone)] +pub struct MongoDbConn { + pub client: Client, + pub index: MongoDbIndex, +} + +impl MongoDbConn { + /// Creates a TTL index on the expiry field. + /// + /// NOTE: This function will need to be called in the main function when initialising a MongoDB connection. + pub async fn create_ttl_index(&self) -> Result<(), Box> { + let collection = self + .client + .database(&self.index.db_name) + .collection::(&self.index.coll_name); + + // Create TTL index on the 'expiry' field + let index_model = mongodb::IndexModel::builder() + .keys(doc! { "expiry": 1 }) + .options(Some( + mongodb::options::IndexOptions::builder() + .expire_after(Some(std::time::Duration::from_secs(0))) + .build(), + )) + .build(); + + collection.create_index(index_model, None).await?; + Ok(()) + } +} + +#[async_trait] +impl KvStoreConnection for MongoDbConn { + async fn init(url: &str) -> Result> { + // Tracing + let span = span!(Level::TRACE, "MongoDbConn::init"); + let _enter = span.enter(); + + let client_options = match ClientOptions::parse(url).await { + Ok(client_options) => client_options, + Err(e) => panic!("Failed to connect to MongoDB instance with error: {e}"), + }; + + trace!("Connected to MongoDB instance at {url}"); + + let client = match Client::with_options(client_options) { + Ok(client) => client, + Err(e) => panic!("Failed to connect to MongoDB instance with error: {e}"), + }; + + trace!("MongoDB client created successfully"); + + let index = MongoDbIndex { + db_name: String::from("default"), + coll_name: String::from("default"), + }; + + Ok(MongoDbConn { client, index }) + } + + async fn set_data( + &mut self, + key: &str, + value_id: &str, + value: T, + ) -> Result<(), Box> { + // Tracing + let span = span!(Level::TRACE, "MongoDbConn::set_data"); + let _enter = span.enter(); + + let collection = self + .client + .database(&self.index.db_name) + .collection::(&self.index.coll_name); + + // Check if the document with the given key exists + let filter = doc! { "_id": key }; + let existing_doc = collection.find_one(filter.clone(), None).await?; + + let mut mapping: HashMap = if let Some(doc) = existing_doc { + if doc.contains_key("data") { + // Deserialize the existing data + mongodb::bson::from_bson(doc.get("data").unwrap().clone())? + } else { + HashMap::new() + } + } else { + HashMap::new() + }; + + // Append the new data to the vec + mapping.insert(value_id.to_string(), value); + + // Serialize the vec back to a BSON array + let serialized_vec = mongodb::bson::to_bson(&mapping)?; + + // Create or update the document + let update = doc! { + "$set": { "data": serialized_vec } + }; + match collection + .update_one( + filter, + update, + mongodb::options::UpdateOptions::builder() + .upsert(true) + .build(), + ) + .await + { + Ok(_) => (), + Err(e) => { + event!(Level::ERROR, "Failed to set data with error: {e}"); + } + }; + + trace!("Data set successfully with expiry"); + + Ok(()) + } + + async fn set_data_with_expiry( + &mut self, + key: &str, + value_id: &str, + value: T, + seconds: usize, + ) -> Result<(), Box> { + // Tracing + let span = span!(Level::TRACE, "MongoDbConn::set_data_with_expiry"); + let _enter = span.enter(); + + let collection = self + .client + .database(&self.index.db_name) + .collection::(&self.index.coll_name); + + // Check if the document with the given key exists + let filter = doc! { "_id": key }; + let existing_doc = collection.find_one(filter.clone(), None).await?; + + let mut mapping: HashMap = if let Some(doc) = existing_doc { + // Deserialize the existing data + mongodb::bson::from_bson(doc.get("data").unwrap().clone())? + } else { + HashMap::new() + }; + + // Append the new data to the vec + mapping.insert(value_id.to_string(), value); + + // Serialize the vec back to a BSON array + let serialized_vec = mongodb::bson::to_bson(&mapping)?; + + // Calculate the expiry time + let expiry_time = (seconds * 1000) as i64; + let expiry_bson_datetime = DateTime::from_millis(expiry_time); + + // Create or update the document with the new expiry time + let update = doc! { + "$set": { + "data": serialized_vec, + "expiry": expiry_bson_datetime, + } + }; + collection + .update_one( + filter, + update, + mongodb::options::UpdateOptions::builder() + .upsert(true) + .build(), + ) + .await?; + + trace!("Data set successfully with expiry"); + + Ok(()) + } + + async fn del_data( + &mut self, + key: &str, + value_id: Option<&str>, + ) -> Result<(), Box> { + // Tracing + let span = span!(Level::TRACE, "MongoDbConn::del_data"); + let _enter = span.enter(); + + let collection = self + .client + .database(&self.index.db_name) + .collection::(&self.index.coll_name); + + // Build the filter based on the key + let filter = doc! { "_id": key }; + + // If value_id is provided, we need to fetch the document and update it + if let Some(value_id) = value_id { + let update = doc! { + "$unset": { + &format!("data.{}", value_id): "" + } + }; + + match collection.find_one_and_update(filter, update, None).await { + Ok(result) => { + if let Some(_) = result { + // Document was found and updated, log success or handle as needed + trace!("Data updated successfully"); + } else { + // Document not found + event!(Level::ERROR, "Document not found for key: {}", key); + } + } + Err(e) => { + // Handle error from MongoDB + event!(Level::ERROR, "Failed to update data with error: {:?}", e); + return Err(Box::new(e)); + } + } + } else { + // value_id is None, so delete the entire document + match collection.delete_one(filter.clone(), None).await { + Ok(_) => { + trace!("Data deleted successfully"); + } + Err(e) => { + event!(Level::ERROR, "Failed to delete data with error: {:?}", e); + return Err(Box::new(e)); + } + }; + } + + Ok(()) + } + + async fn get_data( + &mut self, + key: &str, + value_id: Option<&str>, + ) -> Result>, Box> { + // Tracing + let span = span!(Level::TRACE, "MongoDbConn::get_data"); + let _enter = span.enter(); + + let collection = self + .client + .database(&self.index.db_name) + .collection::(&self.index.coll_name); + + // Check if the document with the given key exists + let filter = doc! { "_id": key }; + let doc_find = match collection.find_one(filter.clone(), None).await { + Ok(doc) => doc, + Err(e) => { + event!(Level::ERROR, "Failed to get data with error: {e}"); + return Ok(None); + } + }; + + if let Some(doc) = doc_find { + // Deserialize the existing data + let mapping: HashMap = + mongodb::bson::from_bson(doc.get("data").unwrap().clone())?; + + if let Some(id) = value_id { + // If value_id is provided, return only the value with the given ID + if let Some(value) = mapping.get(id) { + let mut result: HashMap = HashMap::new(); + result.insert(id.to_string(), value.clone()); + return Ok(Some(result)); + } else { + // Value with the given ID not found + event!(Level::ERROR, "Value with ID {id} not found for key {key}"); + return Ok(None); + } + } + return Ok(Some(mapping)); + } + + warn!("Data unsuccessfully deserialized"); + + Ok(None) + } +} diff --git a/src/db/redis_cache.rs b/src/db/redis_cache.rs new file mode 100644 index 0000000..817a48e --- /dev/null +++ b/src/db/redis_cache.rs @@ -0,0 +1,153 @@ +use std::collections::HashMap; + +use crate::db::handler::{CacheHandler, KvStoreConnection}; +use async_trait::async_trait; +use redis::{aio::ConnectionManager, AsyncCommands}; +use serde::{de::DeserializeOwned, Serialize}; +use tracing::{event, span, Level}; + +#[derive(Clone)] +pub struct RedisCacheConn { + pub connection: ConnectionManager, +} + +#[async_trait] +impl CacheHandler for RedisCacheConn { + async fn expire_entry( + &mut self, + key: &str, + seconds: usize, + ) -> Result<(), Box> { + self.connection.expire(key, seconds).await?; + Ok(()) + } +} + +#[async_trait] +impl KvStoreConnection for RedisCacheConn { + async fn init(url: &str) -> Result> { + let redis_client = redis::Client::open(url)?; + let redis_connection_manager = ConnectionManager::new(redis_client).await?; + + Ok(RedisCacheConn { + connection: redis_connection_manager, + }) + } + + async fn set_data( + &mut self, + key: &str, + value_id: &str, + value: T, + ) -> Result<(), Box> { + let exists: bool = self.connection.exists(key).await?; + + let mut mapping: HashMap = if exists { + // Get the existing data + let data: String = self.connection.get(key).await?; + serde_json::from_str(&data)? + } else { + HashMap::new() + }; + + // Append the new data to the vec + mapping.insert(value_id.to_string(), value); + + let serialized = serde_json::to_string(&mapping)?; + self.connection.set(key, serialized).await?; + + Ok(()) + } + + async fn set_data_with_expiry( + &mut self, + key: &str, + value_id: &str, + value: T, + seconds: usize, + ) -> Result<(), Box> { + // Check if the key exists + let exists: bool = self.connection.exists(key).await?; + + let mut mapping: HashMap = if exists { + // Get the existing data + let data: String = self.connection.get(key).await?; + serde_json::from_str(&data)? + } else { + HashMap::new() + }; + + // Append the new data to the hashmap + mapping.insert(value_id.to_string(), value); + + // Serialize the vec back to a string + let serialized = serde_json::to_string(&mapping)?; + + // Set the data back to Redis + self.connection.set(key, serialized).await?; + + // Set the expiry time for the key + self.connection.expire(key, seconds).await?; + + Ok(()) + } + + async fn del_data( + &mut self, + key: &str, + value_id: Option<&str>, + ) -> Result<(), Box> { + if let Some(value_id) = value_id { + let exists: bool = self.connection.exists(key).await?; + + if exists { + let mut mapping: HashMap = self.get_data(key, None).await?.unwrap(); + mapping.remove(value_id); + let serialized = serde_json::to_string(&mapping)?; + self.connection.set(key, serialized).await?; + } + return Ok(()); + } + + let _: () = self.connection.del(key).await?; + Ok(()) + } + + async fn get_data( + &mut self, + key: &str, + value_id: Option<&str>, + ) -> Result>, Box> { + let span = span!(Level::TRACE, "MongoDbConn::get_data"); + let _enter = span.enter(); + + // Check if the key exists + let exists: bool = self.connection.exists(key).await?; + + if exists { + // Get the existing data + let data: String = self.connection.get(key).await?; + let mapping: HashMap = serde_json::from_str(&data)?; + + if let Some(value_id) = value_id { + let value = mapping.get(value_id); + if let Some(value) = value { + let mut new_mapping: HashMap = HashMap::new(); + new_mapping.insert(value_id.to_string(), value.clone()); + return Ok(Some(new_mapping)); + } else { + // Value with the given ID not found + event!( + Level::ERROR, + "Value with ID {value_id} not found for key {key}" + ); + return Ok(None); + } + } + + return Ok(Some(mapping)); + } + + Ok(None) + } +} diff --git a/src/main.rs b/src/main.rs index 56609c7..abbb90b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ // main.rs pub mod api; +pub mod db; pub mod constants; pub mod interfaces; pub mod utils; diff --git a/src/tests/interfaces.rs b/src/tests/interfaces.rs index 84dc8de..118244a 100644 --- a/src/tests/interfaces.rs +++ b/src/tests/interfaces.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use async_trait::async_trait; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use valence_core::db::handler::{CacheHandler, KvStoreConnection}; +use crate::db::handler::{CacheHandler, KvStoreConnection}; use valence_core::utils::serialize_data; //========== STUB INTERFACES ==========// diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 45aa678..012fb78 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -4,11 +4,10 @@ pub mod interfaces; use crate::api::routes; use crate::tests::constants::{TEST_VALID_ADDRESS, TEST_VALID_PUB_KEY, TEST_VALID_SIG}; use crate::tests::interfaces::DbStub; +use crate::db::handler::KvStoreConnection; use futures::lock::Mutex; -use serde_json::Value; use std::sync::Arc; use valence_core::api::utils::handle_rejection; -use valence_core::db::handler::KvStoreConnection; use warp::Filter; //========== TESTS ==========// diff --git a/src/utils.rs b/src/utils.rs index 3b616ba..482f44f 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -5,6 +5,9 @@ use crate::constants::{ SETTINGS_DB_URL, SETTINGS_DEBUG, SETTINGS_EXTERN_PORT, }; use crate::interfaces::EnvConfig; +use crate::db::handler::KvStoreConnection; +use crate::db::mongo_db::MongoDbConn; +use crate::db::redis_cache::RedisCacheConn; use chrono::prelude::*; use cuckoofilter::{CuckooFilter, ExportedCuckooFilter}; use futures::lock::Mutex; @@ -13,9 +16,6 @@ use serde::{Deserialize, Serialize}; use std::collections::hash_map::DefaultHasher; use std::sync::Arc; use tracing::info; -use valence_core::db::handler::KvStoreConnection; -use valence_core::db::mongo_db::MongoDbConn; -use valence_core::db::redis_cache::RedisCacheConn; // ========== STORAGE SERIALIZATION FOR CUCKOO FILTER ========== // From e5c1d68cfa1323053655aa0d8fbecb8499ba2f2c Mon Sep 17 00:00:00 2001 From: RobinP122 Date: Thu, 27 Jun 2024 11:49:37 +0200 Subject: [PATCH 2/5] Fix warning --- src/api/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/utils.rs b/src/api/utils.rs index 8c1123c..7009843 100644 --- a/src/api/utils.rs +++ b/src/api/utils.rs @@ -55,7 +55,7 @@ pub fn serialize_all_entries(data: HashMap) -> HashMap { output.insert(key, json_value); } - Err(e) => { + Err(_e) => { output.insert(key, json!(value)); } } From 6aec40420d57e30b3090a0f04f182e93f2865dde Mon Sep 17 00:00:00 2001 From: RobinP122 Date: Thu, 27 Jun 2024 11:53:41 +0200 Subject: [PATCH 3/5] Bump up version to 0.1.2 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 90cb219..3268cdb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "valence" -version = "0.1.1" +version = "0.1.2" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From b9c822ed121da521204f36217256c261438dba0c Mon Sep 17 00:00:00 2001 From: RobinP122 Date: Thu, 27 Jun 2024 11:59:42 +0200 Subject: [PATCH 4/5] Set valence_core crate to latest version (0.1.7) --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 3268cdb..68258ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,5 +23,5 @@ tracing = "0.1.37" tracing-subscriber = "0.3.17" tracing-futures = "0.2.3" warp = "0.3.5" -valence_core = "0.1.6" +valence_core = "0.1.7" valence_market = "0.1.0" From a7f14d012a5f6e8c5a6f072d327afe5c7ee56002 Mon Sep 17 00:00:00 2001 From: RobinP122 Date: Thu, 27 Jun 2024 13:25:24 +0200 Subject: [PATCH 5/5] Removed Valence Market plugin. Will be reintroduced at a later stage. --- Cargo.toml | 3 +-- config.toml | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 68258ce..2817f82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,5 +23,4 @@ tracing = "0.1.37" tracing-subscriber = "0.3.17" tracing-futures = "0.2.3" warp = "0.3.5" -valence_core = "0.1.7" -valence_market = "0.1.0" +valence_core = "0.1.7" \ No newline at end of file diff --git a/config.toml b/config.toml index 145fa11..f303388 100644 --- a/config.toml +++ b/config.toml @@ -13,4 +13,4 @@ body_limit = 4096 cache_ttl = 600 # cache lifetime in seconds # Plug-in options -market = true \ No newline at end of file +market = false \ No newline at end of file