Skip to content

Commit

Permalink
refact: define hudi error types for hudi-core crate
Browse files Browse the repository at this point in the history
  • Loading branch information
gohalo committed Sep 10, 2024
1 parent f56603b commit 2f6f596
Show file tree
Hide file tree
Showing 13 changed files with 267 additions and 113 deletions.
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ serde = { workspace = true }
serde_json = { workspace = true }

# "stdlib"
anyhow = { workspace = true }
bytes = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
Expand All @@ -65,6 +64,7 @@ datafusion = { workspace = true, optional = true }
datafusion-expr = { workspace = true, optional = true }
datafusion-common = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }
thiserror = "1.0.63"

[dev-dependencies]
hudi-tests = { path = "../tests" }
Expand Down
16 changes: 12 additions & 4 deletions crates/core/src/config/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
use std::collections::HashMap;
use std::str::FromStr;

use anyhow::{anyhow, Result};
use strum_macros::EnumIter;

use crate::config::{ConfigParser, HudiConfigValue};
use crate::{
config::{ConfigParser, HudiConfigValue},
Error::{ConfNotFound, InvalidConf},
Result,
};

#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiInternalConfig {
Expand Down Expand Up @@ -51,11 +54,16 @@ impl ConfigParser for HudiInternalConfig {
let get_result = configs
.get(self.as_ref())
.map(|v| v.as_str())
.ok_or(anyhow!("Config '{}' not found", self.as_ref()));
.ok_or(ConfNotFound(self.as_ref().to_string()));

match self {
Self::SkipConfigValidation => get_result
.and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
bool::from_str(v).map_err(|e| InvalidConf {
item: Self::SkipConfigValidation.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Boolean),
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::any::type_name;
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Result;
use crate::Result;

pub mod internal;
pub mod read;
Expand Down
20 changes: 14 additions & 6 deletions crates/core/src/config/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
use std::collections::HashMap;
use std::str::FromStr;

use crate::config::{ConfigParser, HudiConfigValue};
use anyhow::{anyhow, Result};
use strum_macros::EnumIter;

use crate::{
config::{ConfigParser, HudiConfigValue},
Error::{ConfNotFound, InvalidConf},
Result,
};

#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiReadConfig {
InputPartitions,
Expand Down Expand Up @@ -53,11 +57,16 @@ impl ConfigParser for HudiReadConfig {
let get_result = configs
.get(self.as_ref())
.map(|v| v.as_str())
.ok_or(anyhow!("Config '{}' not found", self.as_ref()));
.ok_or(ConfNotFound(self.as_ref().to_string()));

match self {
Self::InputPartitions => get_result
.and_then(|v| usize::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
usize::from_str(v).map_err(|e| InvalidConf {
item: Self::InputPartitions.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::UInteger),
Self::AsOfTimestamp => get_result.map(|v| HudiConfigValue::String(v.to_string())),
}
Expand All @@ -69,7 +78,6 @@ mod tests {
use crate::config::read::HudiReadConfig::InputPartitions;
use crate::config::ConfigParser;
use std::collections::HashMap;
use std::num::ParseIntError;

#[test]
fn parse_valid_config_value() {
Expand All @@ -82,7 +90,7 @@ mod tests {
fn parse_invalid_config_value() {
let options = HashMap::from([(InputPartitions.as_ref().to_string(), "foo".to_string())]);
let value = InputPartitions.parse_value(&options);
assert!(value.err().unwrap().is::<ParseIntError>());
assert!(value.is_err());
assert_eq!(
InputPartitions
.parse_value_or_default(&options)
Expand Down
67 changes: 52 additions & 15 deletions crates/core/src/config/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
use std::collections::HashMap;
use std::str::FromStr;

use anyhow::anyhow;
use anyhow::Result;
use strum_macros::{AsRefStr, EnumIter};

use crate::config::{ConfigParser, HudiConfigValue};
use crate::{
config::{ConfigParser, HudiConfigValue},
Error::{self, ConfNotFound, InvalidConf, Unsupported},
Result,
};

#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiTableConfig {
Expand Down Expand Up @@ -87,31 +89,56 @@ impl ConfigParser for HudiTableConfig {
let get_result = configs
.get(self.as_ref())
.map(|v| v.as_str())
.ok_or(anyhow!("Config '{}' not found", self.as_ref()));
.ok_or(ConfNotFound(self.as_ref().to_string()));

match self {
Self::BaseFileFormat => get_result
.and_then(BaseFileFormatValue::from_str)
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
Self::Checksum => get_result
.and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
isize::from_str(v).map_err(|e| InvalidConf {
item: Self::Checksum.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Integer),
Self::DatabaseName => get_result.map(|v| HudiConfigValue::String(v.to_string())),
Self::DropsPartitionFields => get_result
.and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
bool::from_str(v).map_err(|e| InvalidConf {
item: Self::DropsPartitionFields.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Boolean),
Self::IsHiveStylePartitioning => get_result
.and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
bool::from_str(v).map_err(|e| InvalidConf {
item: Self::IsHiveStylePartitioning.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Boolean),
Self::IsPartitionPathUrlencoded => get_result
.and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
bool::from_str(v).map_err(|e| InvalidConf {
item: Self::IsPartitionPathUrlencoded.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Boolean),
Self::KeyGeneratorClass => get_result.map(|v| HudiConfigValue::String(v.to_string())),
Self::PartitionFields => get_result
.map(|v| HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
Self::PrecombineField => get_result.map(|v| HudiConfigValue::String(v.to_string())),
Self::PopulatesMetaFields => get_result
.and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
bool::from_str(v).map_err(|e| InvalidConf {
item: Self::PopulatesMetaFields.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Boolean),
Self::RecordKeyFields => get_result
.map(|v| HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
Expand All @@ -120,10 +147,20 @@ impl ConfigParser for HudiTableConfig {
.and_then(TableTypeValue::from_str)
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
Self::TableVersion => get_result
.and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
isize::from_str(v).map_err(|e| InvalidConf {
item: Self::TableVersion.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Integer),
Self::TimelineLayoutVersion => get_result
.and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
isize::from_str(v).map_err(|e| InvalidConf {
item: Self::TimelineLayoutVersion.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Integer),
}
}
Expand All @@ -138,13 +175,13 @@ pub enum TableTypeValue {
}

impl FromStr for TableTypeValue {
type Err = anyhow::Error;
type Err = Error;

fn from_str(s: &str) -> Result<Self> {
match s.to_ascii_lowercase().as_str() {
"copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite),
"merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead),
_ => Err(anyhow!("Unsupported table type: {}", s)),
v => Err(Unsupported(format!("unsupported table type {}", v))),
}
}
}
Expand All @@ -156,12 +193,12 @@ pub enum BaseFileFormatValue {
}

impl FromStr for BaseFileFormatValue {
type Err = anyhow::Error;
type Err = Error;

fn from_str(s: &str) -> Result<Self> {
match s.to_ascii_lowercase().as_str() {
"parquet" => Ok(Self::Parquet),
_ => Err(anyhow!("Unsupported base file format: {}", s)),
v => Err(Unsupported(format!("unsupported base file format {}", v))),
}
}
}
Expand Down
21 changes: 14 additions & 7 deletions crates/core/src/file_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ use std::fmt::Formatter;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;

use anyhow::{anyhow, Result};

use crate::storage::file_info::FileInfo;
use crate::storage::file_stats::FileStats;
use crate::storage::Storage;
use crate::{Error::Internal, Result};

#[derive(Clone, Debug)]
pub struct BaseFile {
Expand All @@ -40,10 +39,18 @@ pub struct BaseFile {
impl BaseFile {
fn parse_file_name(file_name: &str) -> Result<(String, String)> {
let err_msg = format!("Failed to parse file name '{}' for base file.", file_name);
let (name, _) = file_name.rsplit_once('.').ok_or(anyhow!(err_msg.clone()))?;
let (name, _) = file_name
.rsplit_once('.')
.ok_or(Internal(err_msg.clone()))?;
let parts: Vec<&str> = name.split('_').collect();
let file_group_id = parts.first().ok_or(anyhow!(err_msg.clone()))?.to_string();
let commit_time = parts.get(2).ok_or(anyhow!(err_msg.clone()))?.to_string();
let file_group_id = parts
.first()
.ok_or(Internal(err_msg.clone()))?
.to_string();
let commit_time = parts
.get(2)
.ok_or(Internal(err_msg.clone()))?
.to_string();
Ok((file_group_id, commit_time))
}

Expand Down Expand Up @@ -162,11 +169,11 @@ impl FileGroup {
pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> {
let commit_time = base_file.commit_time.as_str();
if self.file_slices.contains_key(commit_time) {
Err(anyhow!(
Err(Internal(format!(
"Commit time {0} is already present in File Group {1}",
commit_time.to_owned(),
self.id,
))
)))
} else {
self.file_slices.insert(
commit_time.to_owned(),
Expand Down
43 changes: 43 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,46 @@ pub mod config;
pub mod file_group;
pub mod storage;
pub mod table;

use thiserror::Error;

#[derive(Error, Debug)]
pub enum Error {
#[error("Config '{0}' not found")]
ConfNotFound(String),

#[error("Invalid config item '{item}', {source:?}")]
InvalidConf {
item: &'static str,
source: Box<dyn std::error::Error + Sync + Send + 'static>,
},

#[error("Parse url '{url}' failed, {source}")]
UrlParse {
url: String,
source: url::ParseError,
},

#[error("Invalid file path '{name}', {detail}")]
InvalidPath { name: String, detail: String },

#[error("{0}")]
Unsupported(String),

#[error("{0}")]
Internal(String),

#[error(transparent)]
Store(#[from] object_store::Error),

#[error(transparent)]
StorePath(#[from] object_store::path::Error),

#[error(transparent)]
Parquet(#[from] parquet::errors::ParquetError),

#[error(transparent)]
Arrow(#[from] arrow::error::ArrowError),
}

type Result<T, E = Error> = std::result::Result<T, E>;
Loading

0 comments on commit 2f6f596

Please sign in to comment.