Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: improve thread safety and error handling #32

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ uuid = { version = "1" }
# runtime / async
async-trait = { version = "0.1" }
async-recursion = { version = "1.1.1" }
dashmap = { version = "6.0.1" }
futures = { version = "0.3" }
tokio = { version = "1", features = ["rt-multi-thread"]}
num_cpus = { version = "1" }
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,6 @@ url = { workspace = true }
# runtime / async
async-recursion = { workspace = true }
async-trait = { workspace = true }
dashmap = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
24 changes: 17 additions & 7 deletions crates/core/src/file_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ use std::fmt;
use std::fmt::Formatter;
use std::path::PathBuf;

use anyhow::{anyhow, Result};

use crate::storage::file_info::FileInfo;
use crate::storage::file_stats::FileStats;
use anyhow::{anyhow, Result};
use crate::storage::Storage;

#[derive(Clone, Debug)]
pub struct BaseFile {
Expand Down Expand Up @@ -63,10 +65,6 @@ impl BaseFile {
stats: None,
})
}

pub fn populate_stats(&mut self, stats: FileStats) {
self.stats = Some(stats)
}
}

#[derive(Clone, Debug)]
Expand All @@ -81,9 +79,9 @@ impl FileSlice {
}

pub fn base_file_relative_path(&self) -> String {
let partition_path = self.partition_path.clone().unwrap_or_default();
let ptn = self.partition_path.as_deref().unwrap_or_default();
let file_name = &self.base_file.info.name;
PathBuf::from(partition_path)
PathBuf::from(ptn)
.join(file_name)
.to_str()
.unwrap()
Expand All @@ -97,6 +95,18 @@ impl FileSlice {
pub fn set_base_file(&mut self, base_file: BaseFile) {
self.base_file = base_file
}

pub async fn load_stats(&mut self, storage: &Storage) -> Result<()> {
if self.base_file.stats.is_none() {
let parquet_meta = storage
.get_parquet_file_metadata(&self.base_file_relative_path())
.await;
let num_records = parquet_meta.file_metadata().num_rows();
let stats = FileStats { num_records };
self.base_file.stats = Some(stats);
}
Ok(())
}
}

#[derive(Clone, Debug)]
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub mod file_group;
pub mod table;
pub type HudiTable = Table;
mod storage;
mod timeline;

pub fn crate_version() -> &'static str {
env!("CARGO_PKG_VERSION")
Expand Down
45 changes: 25 additions & 20 deletions crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::record_batch::RecordBatch;
use async_recursion::async_recursion;
use bytes::Bytes;
Expand All @@ -40,20 +41,23 @@
pub(crate) mod utils;

#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct Storage {
base_url: Url,
base_url: Arc<Url>,
options: Arc<HashMap<String, String>>,
object_store: Arc<dyn ObjectStore>,
options: HashMap<String, String>,
}

impl Storage {
pub fn new(base_url: Url, options: HashMap<String, String>) -> Box<Storage> {
let object_store = parse_url_opts(&base_url, &options).unwrap().0;
Box::from(Storage {
base_url,
object_store: Arc::new(object_store),
options,
})
pub fn new(base_url: Arc<Url>, options: Arc<HashMap<String, String>>) -> Result<Arc<Storage>> {
match parse_url_opts(&base_url, &*options) {
Ok(object_store) => Ok(Arc::new(Storage {
base_url,
options,
object_store: Arc::new(object_store.0),
})),
Err(e) => Err(anyhow!("Failed to create storage: {}", e)),

Check warning on line 59 in crates/core/src/storage/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/storage/mod.rs#L59

Added line #L59 was not covered by tests
}
}

#[allow(dead_code)]
Expand Down Expand Up @@ -167,6 +171,7 @@
use std::collections::{HashMap, HashSet};
use std::fs::canonicalize;
use std::path::Path;
use std::sync::Arc;

use object_store::path::Path as ObjPath;
use url::Url;
Expand All @@ -181,7 +186,7 @@
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
let storage = Storage::new(base_url, HashMap::new());
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let first_level_dirs: HashSet<String> = storage.list_dirs(None).await.into_iter().collect();
assert_eq!(
first_level_dirs,
Expand All @@ -202,7 +207,7 @@
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
let storage = Storage::new(base_url, HashMap::new());
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let first_level_dirs: HashSet<ObjPath> = storage
.list_dirs_as_obj_paths(None)
.await
Expand All @@ -224,12 +229,12 @@
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
let storage = Storage::new(base_url.clone(), HashMap::new());
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let file_info_1: Vec<FileInfo> = storage.list_files(None).await.into_iter().collect();
assert_eq!(
file_info_1,
vec![FileInfo {
uri: base_url.clone().join("a.parquet").unwrap().to_string(),
uri: storage.base_url.join("a.parquet").unwrap().to_string(),
name: "a.parquet".to_string(),
size: 0,
}]
Expand All @@ -242,8 +247,8 @@
assert_eq!(
file_info_2,
vec![FileInfo {
uri: base_url
.clone()
uri: storage
.base_url
.join("part1/b.parquet")
.unwrap()
.to_string(),
Expand All @@ -259,8 +264,8 @@
assert_eq!(
file_info_3,
vec![FileInfo {
uri: base_url
.clone()
uri: storage
.base_url
.join("part2/part22/c.parquet")
.unwrap()
.to_string(),
Expand All @@ -276,7 +281,7 @@
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
let storage = Storage::new(base_url, HashMap::new());
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let leaf_dirs = get_leaf_dirs(&storage, None).await;
assert_eq!(
leaf_dirs,
Expand All @@ -288,7 +293,7 @@
async fn storage_get_file_info() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
let storage = Storage::new(base_url, HashMap::new());
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let file_info = storage.get_file_info("a.parquet").await;
assert_eq!(file_info.name, "a.parquet");
assert_eq!(
Expand All @@ -302,7 +307,7 @@
async fn storage_get_parquet_file_data() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
let storage = Storage::new(base_url, HashMap::new());
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let file_data = storage.get_parquet_file_data("a.parquet").await;
assert_eq!(file_data.len(), 1);
assert_eq!(file_data.first().unwrap().num_rows(), 5);
Expand Down
Loading
Loading