Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replaced asterisk with constraint name in get_constraints #3270

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1415,9 +1415,14 @@ impl DeltaDataChecker {
));
}

let field_to_select = if check.as_any().is::<Constraint>() {
"*"
} else {
check.get_name()
};
let sql = format!(
"SELECT {} FROM `{table_name}` WHERE NOT ({}) LIMIT 1",
check.get_name(),
field_to_select,
check.get_expression()
);

Expand Down Expand Up @@ -2160,6 +2165,59 @@ mod tests {
assert!(matches!(result, Err(DeltaTableError::Generic { .. })));
}

#[tokio::test]
async fn test_enforce_constraints() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", ArrowDataType::Utf8, false),
Field::new("b", ArrowDataType::Int32, false),
]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
],
)
.unwrap();
// Empty constraints is okay
let constraints: Vec<Constraint> = vec![];
assert!(DeltaDataChecker::new_with_constraints(constraints)
.check_batch(&batch)
.await
.is_ok());

// Valid invariants return Ok(())
let constraints = vec![
Constraint::new("custom_a", "a is not null"),
Constraint::new("custom_b", "b < 1000"),
];
assert!(DeltaDataChecker::new_with_constraints(constraints)
.check_batch(&batch)
.await
.is_ok());

// Violated invariants returns an error with list of violations
let constraints = vec![
Constraint::new("custom_a", "a is null"),
Constraint::new("custom_B", "b < 100"),
];
let result = DeltaDataChecker::new_with_constraints(constraints)
.check_batch(&batch)
.await;
assert!(result.is_err());
assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
if let Err(DeltaTableError::InvalidData { violations }) = result {
assert_eq!(violations.len(), 2);
}

// Irrelevant constraints return a different error
let constraints = vec![Constraint::new("custom_c", "c > 2000")];
let result = DeltaDataChecker::new_with_constraints(constraints)
.check_batch(&batch)
.await;
assert!(result.is_err());
}

#[test]
fn roundtrip_test_delta_exec_plan() {
let ctx = SessionContext::new();
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! The Kernel module contains all the logic for reading and processing the Delta Lake transaction log.

use delta_kernel::engine::arrow_expression::ArrowExpressionHandler;
use std::sync::LazyLock;
use std::{any::Any, sync::LazyLock};

pub mod arrow;
pub mod error;
Expand All @@ -21,6 +21,8 @@ pub trait DataCheck {
fn get_name(&self) -> &str;
/// The SQL expression to use for the check
fn get_expression(&self) -> &str;

fn as_any(&self) -> &dyn Any;
}

static ARROW_HANDLER: LazyLock<ArrowExpressionHandler> =
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/kernel/models/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Delta table schema
use std::any::Any;
use std::sync::Arc;

pub use delta_kernel::schema::{
Expand Down Expand Up @@ -44,6 +45,10 @@ impl DataCheck for Invariant {
fn get_expression(&self) -> &str {
&self.invariant_sql
}

fn as_any(&self) -> &dyn Any {
self
}
}

/// Trait to add convenience functions to struct type
Expand Down
26 changes: 26 additions & 0 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,32 @@ mod tests {
.to_owned()
}

#[tokio::test]
async fn test_get_constraints_with_correct_names() -> DeltaResult<()> {
// The key of a constraint is allowed to be custom
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#check-constraints
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.await?;
let table = DeltaOps(write);

let constraint = table
.add_constraint()
.with_constraint("my_custom_constraint", "value < 100")
.await;
assert!(constraint.is_ok());
let constraints = constraint
.unwrap()
.state
.unwrap()
.table_config()
.get_constraints();
assert!(constraints.len() == 1);
assert_eq!(constraints[0].name, "my_custom_constraint");
Ok(())
}

#[tokio::test]
async fn add_constraint_with_invalid_data() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ impl TableConfig<'_> {
.iter()
.filter_map(|(field, value)| {
if field.starts_with("delta.constraints") {
value.as_ref().map(|f| Constraint::new("*", f))
let constraint_name = field.replace("delta.constraints.", "");
value.as_ref().map(|f| Constraint::new(&constraint_name, f))
} else {
None
}
Expand Down
9 changes: 9 additions & 0 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Delta Table read and write implementation
use std::any::Any;
use std::cmp::{min, Ordering};
use std::collections::HashMap;
use std::fmt;
Expand Down Expand Up @@ -155,6 +156,10 @@ impl DataCheck for Constraint {
fn get_expression(&self) -> &str {
&self.expr
}

fn as_any(&self) -> &dyn Any {
self
}
}

/// A generated column
Expand Down Expand Up @@ -195,6 +200,10 @@ impl DataCheck for GeneratedColumn {
fn get_expression(&self) -> &str {
&self.validation_expr
}

fn as_any(&self) -> &dyn Any {
self
}
}

/// Return partition fields along with their data type from the current schema.
Expand Down
Loading