Skip to content

Commit

Permalink
feat: define Hudi error types across hudi-core (#124)
Browse files Browse the repository at this point in the history
Use `thiserror` instead of `anyhow` to define the error for hudi-core crate.

---------

Signed-off-by: GoHalo <gohalo@163.com>
Co-authored-by: Shiyan Xu <2701446+xushiyan@users.noreply.github.com>
  • Loading branch information
gohalo and xushiyan authored Dec 5, 2024
1 parent f7fcf6d commit 12c60ac
Show file tree
Hide file tree
Showing 19 changed files with 333 additions and 161 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ serde = { version = "1.0.203", features = ["derive"] }
serde_json = { version = "1" }

# "stdlib"
anyhow = { version = "1.0.86" }
thiserror = { version = "2.0.3" }
bytes = { version = "1" }
paste = { version = "1.0.15" }
once_cell = { version = "1.19.0" }
Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ serde = { workspace = true }
serde_json = { workspace = true }

# "stdlib"
anyhow = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true }
paste = { workspace = true }
strum = { workspace = true }
Expand Down
16 changes: 12 additions & 4 deletions crates/core/src/config/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
use std::collections::HashMap;
use std::str::FromStr;

use anyhow::{anyhow, Result};
use strum_macros::EnumIter;

use crate::config::{ConfigParser, HudiConfigValue};
use crate::{
config::{ConfigParser, HudiConfigValue},
CoreError::{ConfigNotFound, InvalidConfig},
Result,
};

/// Configurations for internal use.
///
Expand Down Expand Up @@ -64,11 +67,16 @@ impl ConfigParser for HudiInternalConfig {
let get_result = configs
.get(self.as_ref())
.map(|v| v.as_str())
.ok_or(anyhow!("Config '{}' not found", self.as_ref()));
.ok_or(ConfigNotFound(self.as_ref().to_string()));

match self {
Self::SkipConfigValidation => get_result
.and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
bool::from_str(v).map_err(|e| InvalidConfig {
item: Self::SkipConfigValidation.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Boolean),
}
}
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use std::any::type_name;
use std::collections::HashMap;
use std::sync::Arc;

use crate::storage::utils::parse_uri;
use anyhow::Result;
use crate::{storage::utils::parse_uri, Result};
use serde::{Deserialize, Serialize};
use url::Url;

Expand Down
25 changes: 19 additions & 6 deletions crates/core/src/config/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
use std::collections::HashMap;
use std::str::FromStr;

use crate::config::{ConfigParser, HudiConfigValue};
use anyhow::{anyhow, Result};
use strum_macros::EnumIter;

use crate::{
config::{ConfigParser, HudiConfigValue},
CoreError::{ConfigNotFound, InvalidConfig},
Result,
};

/// Configurations for reading Hudi tables.
///
/// **Example**
Expand All @@ -37,6 +41,7 @@ use strum_macros::EnumIter;
/// HudiTable::new_with_options("/tmp/hudi_data", options)
/// ```
///
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiReadConfig {
/// Define input splits
Expand Down Expand Up @@ -74,11 +79,16 @@ impl ConfigParser for HudiReadConfig {
let get_result = configs
.get(self.as_ref())
.map(|v| v.as_str())
.ok_or(anyhow!("Config '{}' not found", self.as_ref()));
.ok_or(ConfigNotFound(self.as_ref().to_string()));

match self {
Self::InputPartitions => get_result
.and_then(|v| usize::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
usize::from_str(v).map_err(|e| InvalidConfig {
item: Self::InputPartitions.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::UInteger),
Self::AsOfTimestamp => get_result.map(|v| HudiConfigValue::String(v.to_string())),
}
Expand All @@ -89,8 +99,8 @@ impl ConfigParser for HudiReadConfig {
mod tests {
use crate::config::read::HudiReadConfig::InputPartitions;
use crate::config::ConfigParser;
use crate::CoreError::InvalidConfig;
use std::collections::HashMap;
use std::num::ParseIntError;

#[test]
fn parse_valid_config_value() {
Expand All @@ -103,7 +113,10 @@ mod tests {
fn parse_invalid_config_value() {
let options = HashMap::from([(InputPartitions.as_ref().to_string(), "foo".to_string())]);
let value = InputPartitions.parse_value(&options);
assert!(value.err().unwrap().is::<ParseIntError>());
assert!(matches!(
value.unwrap_err(),
InvalidConfig { item: _, source: _ }
));
assert_eq!(
InputPartitions
.parse_value_or_default(&options)
Expand Down
67 changes: 52 additions & 15 deletions crates/core/src/config/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
use std::collections::HashMap;
use std::str::FromStr;

use anyhow::anyhow;
use anyhow::Result;
use strum_macros::{AsRefStr, EnumIter};

use crate::config::{ConfigParser, HudiConfigValue};
use crate::{
config::{ConfigParser, HudiConfigValue},
CoreError::{self, ConfigNotFound, InvalidConfig, Unsupported},
Result,
};

/// Configurations for Hudi tables, most of them are persisted in `hoodie.properties`.
///
Expand Down Expand Up @@ -146,32 +148,57 @@ impl ConfigParser for HudiTableConfig {
let get_result = configs
.get(self.as_ref())
.map(|v| v.as_str())
.ok_or(anyhow!("Config '{}' not found", self.as_ref()));
.ok_or(ConfigNotFound(self.as_ref().to_string()));

match self {
Self::BaseFileFormat => get_result
.and_then(BaseFileFormatValue::from_str)
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
Self::BasePath => get_result.map(|v| HudiConfigValue::String(v.to_string())),
Self::Checksum => get_result
.and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
isize::from_str(v).map_err(|e| InvalidConfig {
item: Self::Checksum.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Integer),
Self::DatabaseName => get_result.map(|v| HudiConfigValue::String(v.to_string())),
Self::DropsPartitionFields => get_result
.and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
bool::from_str(v).map_err(|e| InvalidConfig {
item: Self::DropsPartitionFields.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Boolean),
Self::IsHiveStylePartitioning => get_result
.and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
bool::from_str(v).map_err(|e| InvalidConfig {
item: Self::IsHiveStylePartitioning.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Boolean),
Self::IsPartitionPathUrlencoded => get_result
.and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
bool::from_str(v).map_err(|e| InvalidConfig {
item: Self::IsPartitionPathUrlencoded.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Boolean),
Self::KeyGeneratorClass => get_result.map(|v| HudiConfigValue::String(v.to_string())),
Self::PartitionFields => get_result
.map(|v| HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
Self::PrecombineField => get_result.map(|v| HudiConfigValue::String(v.to_string())),
Self::PopulatesMetaFields => get_result
.and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
bool::from_str(v).map_err(|e| InvalidConfig {
item: Self::PopulatesMetaFields.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Boolean),
Self::RecordKeyFields => get_result
.map(|v| HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
Expand All @@ -180,10 +207,20 @@ impl ConfigParser for HudiTableConfig {
.and_then(TableTypeValue::from_str)
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
Self::TableVersion => get_result
.and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
isize::from_str(v).map_err(|e| InvalidConfig {
item: Self::TableVersion.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Integer),
Self::TimelineLayoutVersion => get_result
.and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
.and_then(|v| {
isize::from_str(v).map_err(|e| InvalidConfig {
item: Self::TimelineLayoutVersion.as_ref(),
source: Box::new(e),
})
})
.map(HudiConfigValue::Integer),
}
}
Expand All @@ -199,13 +236,13 @@ pub enum TableTypeValue {
}

impl FromStr for TableTypeValue {
type Err = anyhow::Error;
type Err = CoreError;

fn from_str(s: &str) -> Result<Self> {
match s.to_ascii_lowercase().as_str() {
"copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite),
"merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead),
_ => Err(anyhow!("Unsupported table type: {}", s)),
v => Err(Unsupported(format!("Unsupported table type {}", v))),
}
}
}
Expand All @@ -218,12 +255,12 @@ pub enum BaseFileFormatValue {
}

impl FromStr for BaseFileFormatValue {
type Err = anyhow::Error;
type Err = CoreError;

fn from_str(s: &str) -> Result<Self> {
match s.to_ascii_lowercase().as_str() {
"parquet" => Ok(Self::Parquet),
_ => Err(anyhow!("Unsupported base file format: {}", s)),
v => Err(Unsupported(format!("Unsupported base file format {}", v))),
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions crates/core/src/config/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
*/
//! Config utilities.
use anyhow::{Context, Result};
use bytes::Bytes;
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Cursor};

use crate::{CoreError, Result};

/// Returns an empty iterator to represent an empty set of options.
pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
std::iter::empty::<(&str, &str)>()
Expand Down Expand Up @@ -57,15 +58,17 @@ pub fn parse_data_for_options(data: &Bytes, split_chars: &str) -> Result<HashMap
let mut options = HashMap::new();

for line in lines {
let line = line.context("Failed to read line")?;
let line = line.map_err(|e| CoreError::Internal(format!("Failed to read line {e}")))?;
let trimmed_line = line.trim();
if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
continue;
}
let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
let key = parts
.next()
.context("Missing key in config line")?
.ok_or(CoreError::Internal(
"Missing key in config line".to_string(),
))?
.trim()
.to_owned();
let value = parts.next().unwrap_or("").trim().to_owned();
Expand Down
15 changes: 8 additions & 7 deletions crates/core/src/file_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ use std::fmt::Formatter;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;

use anyhow::{anyhow, Result};

use crate::storage::file_info::FileInfo;
use crate::storage::file_stats::FileStats;
use crate::storage::Storage;
use crate::{CoreError::Internal, Result};

/// Represents common metadata about a Hudi Base File.
#[derive(Clone, Debug)]
Expand All @@ -51,10 +50,12 @@ impl BaseFile {
/// Parse file name and extract file_group_id and commit_time.
fn parse_file_name(file_name: &str) -> Result<(String, String)> {
let err_msg = format!("Failed to parse file name '{}' for base file.", file_name);
let (name, _) = file_name.rsplit_once('.').ok_or(anyhow!(err_msg.clone()))?;
let (name, _) = file_name
.rsplit_once('.')
.ok_or(Internal(err_msg.clone()))?;
let parts: Vec<&str> = name.split('_').collect();
let file_group_id = parts.first().ok_or(anyhow!(err_msg.clone()))?.to_string();
let commit_time = parts.get(2).ok_or(anyhow!(err_msg.clone()))?.to_string();
let file_group_id = parts.first().ok_or(Internal(err_msg.clone()))?.to_string();
let commit_time = parts.get(2).ok_or(Internal(err_msg.clone()))?.to_string();
Ok((file_group_id, commit_time))
}

Expand Down Expand Up @@ -189,11 +190,11 @@ impl FileGroup {
pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> {
let commit_time = base_file.commit_time.as_str();
if self.file_slices.contains_key(commit_time) {
Err(anyhow!(
Err(Internal(format!(
"Commit time {0} is already present in File Group {1}",
commit_time.to_owned(),
self.id,
))
)))
} else {
self.file_slices.insert(
commit_time.to_owned(),
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/file_group/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::config::utils::split_hudi_options_from_others;
use crate::config::HudiConfigs;
use crate::file_group::FileSlice;
use crate::storage::Storage;
use anyhow::Result;
use crate::Result;
use arrow_array::RecordBatch;
use std::sync::Arc;

Expand Down
46 changes: 46 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,49 @@ pub mod file_group;
pub mod storage;
pub mod table;
pub mod util;

use thiserror::Error;

#[derive(Error, Debug)]
pub enum CoreError {
#[error("Config '{0}' not found")]
ConfigNotFound(String),

#[error("Invalid config item '{item}', {source:?}")]
InvalidConfig {
item: &'static str,
source: Box<dyn std::error::Error + Sync + Send + 'static>,
},

#[error("Parse url '{url}' failed, {source}")]
UrlParse {
url: String,
source: url::ParseError,
},

#[error("Invalid file path '{name}', {detail}")]
InvalidPath { name: String, detail: String },

#[error("{0}")]
Unsupported(String),

#[error("{0}")]
Internal(String),

#[error(transparent)]
Utf8Error(#[from] std::str::Utf8Error),

#[error(transparent)]
ObjectStore(#[from] object_store::Error),

#[error(transparent)]
ObjectStorePath(#[from] object_store::path::Error),

#[error(transparent)]
Parquet(#[from] parquet::errors::ParquetError),

#[error(transparent)]
Arrow(#[from] arrow::error::ArrowError),
}

type Result<T, E = CoreError> = std::result::Result<T, E>;
Loading

0 comments on commit 12c60ac

Please sign in to comment.