Skip to content

Commit

Permalink
Save and load cuckoo filter from disk
Browse files Browse the repository at this point in the history
  • Loading branch information
BHouwens committed Jun 14, 2024
1 parent 5b7e210 commit 5ebb359
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ pub const DRUID_LENGTH: usize = 16;
/// ==== STORAGE ==== ///
pub const DB_KEY: &str = "default";
pub const CUCKOO_FILTER_KEY: &str = "cuckoo_filter";
13 changes: 10 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub mod utils;
pub mod tests;

use crate::api::routes::*;
use crate::utils::{construct_mongodb_conn, construct_redis_conn, load_config, print_welcome};
use crate::utils::{construct_mongodb_conn, construct_redis_conn, load_config, print_welcome, init_cuckoo_filter};

use futures::lock::Mutex;
use serde_json::Value;
Expand All @@ -30,14 +30,21 @@ async fn main() {
"{}{}:{}@{}:{}",
config.db_protocol, config.db_user, config.db_password, config.db_url, config.db_port
);
let cuckoo_filter = Arc::new(Mutex::new(cuckoofilter::CuckooFilter::new()));


info!("Connecting to Redis at {}", cache_addr);
info!("Connecting to MongoDB at {}", db_addr);



let cache_conn = construct_redis_conn(&cache_addr).await;
let db_conn = construct_mongodb_conn(&db_addr).await;

let cf_import = match init_cuckoo_filter(db_conn.clone()).await {
Ok(cf) => cf,
Err(e) => panic!("Failed to initialize cuckoo filter with error: {}", e),
};
let cuckoo_filter = Arc::new(Mutex::new(cf_import));

let routes = get_data::<MongoDbConn, RedisCacheConn, Value>(
db_conn.clone(),
cache_conn.clone(),
Expand Down
97 changes: 96 additions & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,47 @@
use crate::constants::{
CONFIG_FILE, DRUID_CHARSET, DRUID_LENGTH, SETTINGS_BODY_LIMIT, SETTINGS_CACHE_PASSWORD,
SETTINGS_CACHE_PORT, SETTINGS_CACHE_TTL, SETTINGS_CACHE_URL, SETTINGS_DB_PASSWORD,
SETTINGS_DB_PORT, SETTINGS_DB_PROTOCOL, SETTINGS_DB_URL, SETTINGS_DEBUG, SETTINGS_EXTERN_PORT,
SETTINGS_DB_PORT, SETTINGS_DB_PROTOCOL, SETTINGS_DB_URL, SETTINGS_DEBUG, SETTINGS_EXTERN_PORT, CUCKOO_FILTER_KEY
};
use crate::interfaces::EnvConfig;
use chrono::prelude::*;
use cuckoofilter::{CuckooFilter, ExportedCuckooFilter};
use futures::lock::Mutex;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::sync::Arc;
use valence_core::db::handler::KvStoreConnection;
use valence_core::db::mongo_db::MongoDbConn;
use valence_core::db::redis_cache::RedisCacheConn;

// ========== STORAGE SERIALIZATION FOR CUCKOO FILTER ========== //

/// Serializable struct for cuckoo filter
#[derive(Serialize, Deserialize)]
struct StorageReadyCuckooFilter {
values: Vec<u8>,
length: usize,
}

impl From<ExportedCuckooFilter> for StorageReadyCuckooFilter {
fn from(cf: ExportedCuckooFilter) -> Self {
StorageReadyCuckooFilter {
values: cf.values,
length: cf.length,
}
}
}

impl Into<ExportedCuckooFilter> for StorageReadyCuckooFilter {
fn into(self) -> ExportedCuckooFilter {
ExportedCuckooFilter {
values: self.values,
length: self.length,
}
}
}

// ========== DB UTILS ========== //

/// Constructs a MongoDB connection
Expand Down Expand Up @@ -42,6 +72,71 @@ pub async fn construct_redis_conn(url: &str) -> Arc<Mutex<RedisCacheConn>> {
Arc::new(Mutex::new(redis_conn))
}

// ========== CUCKOO FILTER UTILS ========== //

/// Saves the cuckoo filter to disk
///
/// ### Arguments
///
/// * `cf` - The cuckoo filter to save
/// * `db` - The database connection
pub async fn save_cuckoo_filter_to_disk<T: KvStoreConnection>(cf: &CuckooFilter<DefaultHasher>, db: Arc<Mutex<T>>) -> Result<(), String> {
let cuckoo_export = cf.export();
let serializable_cuckoo: StorageReadyCuckooFilter = cuckoo_export.into();
let mut db_lock = db.lock().await;

match db_lock.set_data(CUCKOO_FILTER_KEY, serializable_cuckoo).await {
Ok(_) => Ok(()),
Err(e) => Err(format!("Failed to save cuckoo filter to disk with error: {}", e)),
}
}

/// Loads the cuckoo filter from disk
///
/// ### Arguments
///
/// * `db` - The database connection
pub async fn load_cuckoo_filter_from_disk<T: KvStoreConnection>(db: Arc<Mutex<T>>) -> Result<CuckooFilter<DefaultHasher>, String> {
let mut db_lock = db.lock().await;

match db_lock.get_data(CUCKOO_FILTER_KEY).await {
Ok(data) => {
match data {
Some(data) => {
let cuckoo_filter: StorageReadyCuckooFilter = match serde_json::from_slice(&data) {
Ok(cf) => cf,
Err(e) => return Err(format!("Failed to deserialize cuckoo filter from disk with error: {}", e))
};

let cfe: ExportedCuckooFilter = cuckoo_filter.into();
let cf = CuckooFilter::from(cfe);

Ok(cf)
},
None => Err("No cuckoo filter found in DB".to_string())
}
},
Err(e) => Err(format!("Failed to load cuckoo filter from disk with error: {}", e))
}
}

/// Initializes the cuckoo filter
///
/// ### Arguments
///
/// * `db` - The database connection
pub async fn init_cuckoo_filter<T: KvStoreConnection>(db: Arc<Mutex<T>>) -> Result<CuckooFilter<DefaultHasher>, String> {
match load_cuckoo_filter_from_disk(db.clone()).await {
Ok(cf) => Ok(cf),
Err(_) => {
let cf = CuckooFilter::new();
save_cuckoo_filter_to_disk(&cf, db).await.unwrap();

Ok(cf)
}
}
}

// ========== CONFIG UTILS ========== //

/// Loads the config file
Expand Down

0 comments on commit 5ebb359

Please sign in to comment.