From 39ecc09e614078787c5f5484c316e9ddc7d422f5 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 1 Dec 2024 14:51:39 +0000 Subject: [PATCH 1/6] chore: datafusion 44 fix for PartitionedFile Signed-off-by: R. Tyler Croy --- crates/core/src/delta_datafusion/cdf/scan_utils.rs | 1 + crates/core/src/delta_datafusion/mod.rs | 3 +++ 2 files changed, 4 insertions(+) diff --git a/crates/core/src/delta_datafusion/cdf/scan_utils.rs b/crates/core/src/delta_datafusion/cdf/scan_utils.rs index 27285179f6..459157f4c8 100644 --- a/crates/core/src/delta_datafusion/cdf/scan_utils.rs +++ b/crates/core/src/delta_datafusion/cdf/scan_utils.rs @@ -84,6 +84,7 @@ pub fn create_partition_values( extensions: None, range: None, statistics: None, + metadata_size_hint: None, }; file_groups.entry(new_part_values).or_default().push(part); diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index d430ed8655..e41e6c4397 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -1054,6 +1054,7 @@ fn partitioned_file_from_action( range: None, extensions: None, statistics: None, + metadata_size_hint: None, } } @@ -1824,6 +1825,7 @@ mod tests { use crate::storage::ObjectStoreRef; use crate::writer::test_utils::get_delta_schema; use arrow::array::StructArray; + use arrow::datatypes::DataType; use arrow::datatypes::{Field, Schema}; use arrow_array::cast::AsArray; use bytes::Bytes; @@ -1959,6 +1961,7 @@ mod tests { range: None, extensions: None, statistics: None, + metadata_size_hint: None, }; assert_eq!(file.partition_values, ref_file.partition_values) } From dddab7993ed581e9716ed3c4259f5566205e883c Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 1 Dec 2024 15:27:55 +0000 Subject: [PATCH 2/6] chore: datafusion 44 prototype change on make_array Signed-off-by: R. Tyler Croy --- crates/core/src/delta_datafusion/expr.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 2c127f010d..b117d49bb4 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -99,14 +99,13 @@ impl ScalarUDFImpl for MakeParquetArray { r_type } - fn invoke(&self, args: &[ColumnarValue]) -> Result { + fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result { let mut data_type = DataType::Null; for arg in args { data_type = arg.data_type(); } - #[allow(deprecated)] - match self.actual.invoke(args)? { + match self.actual.invoke_batch(args, number_rows)? { ColumnarValue::Scalar(ScalarValue::List(df_array)) => { let field = Arc::new(Field::new("element", data_type, true)); let result = Ok(ColumnarValue::Scalar(ScalarValue::List(Arc::new( @@ -127,10 +126,6 @@ impl ScalarUDFImpl for MakeParquetArray { } } - fn invoke_no_args(&self, number_rows: usize) -> Result { - self.actual.invoke_batch(&[], number_rows) - } - fn aliases(&self) -> &[String] { &self.aliases } From 1c7b29e5f77c9eba289ac4aa6a1506e28653c35e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 21 Dec 2024 14:05:20 -0500 Subject: [PATCH 3/6] Pin to pre-release Datafusion --- Cargo.toml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 945117eef2..cfdb12efe8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,3 +76,17 @@ async-trait = { version = "0.1" } futures = { version = "0.3" } tokio = { version = "1" } num_cpus = { version = "1" } + +# temporary datafusion patches +[patch.crates-io] +datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } +datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } +datafusion-execution = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } +datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } +datafusion-ffi = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } +datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } +datafusion-functions-aggregate = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } +datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } +datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } +datafusion-proto = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } +datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } From 678736cd87ae5644f394c1cc54e6bfe304113094 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 21 Dec 2024 14:06:12 -0500 Subject: [PATCH 4/6] chore: Update to latest sqlparser --- crates/core/Cargo.toml | 2 +- crates/sql/src/parser.rs | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 0797b77b9a..965041bdd0 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -96,7 +96,7 @@ tracing = { workspace = true } rand = "0.8" z85 = "3.0.5" maplit = "1" -sqlparser = { version = "0.52.0" } +sqlparser = { version = "0.53.0" } [dev-dependencies] criterion = "0.5" diff --git a/crates/sql/src/parser.rs b/crates/sql/src/parser.rs index 19bf3f00b0..4844c7fb2c 100644 --- a/crates/sql/src/parser.rs +++ b/crates/sql/src/parser.rs @@ -224,9 +224,9 @@ impl<'a> DeltaParser<'a> { #[cfg(test)] mod tests { - use datafusion_sql::sqlparser::ast::Ident; - use super::*; + use datafusion_sql::sqlparser::ast::Ident; + use datafusion_sql::sqlparser::tokenizer::Span; fn expect_parse_ok(sql: &str, expected: Statement) -> Result<(), ParserError> { let statements = DeltaParser::parse_sql(sql)?; @@ -245,6 +245,7 @@ mod tests { table: ObjectName(vec![Ident { value: "data_table".to_string(), quote_style: None, + span: Span::empty(), }]), retention_hours: None, dry_run: false, @@ -255,6 +256,7 @@ mod tests { table: ObjectName(vec![Ident { value: "data_table".to_string(), quote_style: None, + span: Span::empty(), }]), retention_hours: Some(10), dry_run: false, @@ -265,6 +267,7 @@ mod tests { table: ObjectName(vec![Ident { value: "data_table".to_string(), quote_style: None, + span: Span::empty(), }]), retention_hours: Some(10), dry_run: true, @@ -275,6 +278,7 @@ mod tests { table: ObjectName(vec![Ident { value: "data_table".to_string(), quote_style: None, + span: Span::empty(), }]), retention_hours: None, dry_run: true, From ca2af9e607d2564786493e6b7d88a262a5e01ece Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 22 Dec 2024 10:38:00 -0500 Subject: [PATCH 5/6] chore: Reduce compiler warnings by updating to use non deprecated DataFusion APIs (#3077) - targets https://github.com/delta-io/delta-rs/pull/3073 from @rtyler This PR makes some small changes to reduce the build warnings by using non deprecated APIs --- crates/catalog-unity/src/error.rs | 3 --- crates/core/src/delta_datafusion/mod.rs | 1 - crates/core/src/operations/optimize.rs | 10 ++++------ crates/sql/src/parser.rs | 4 ++-- crates/test/src/datafusion.rs | 10 +--------- 5 files changed, 7 insertions(+), 21 deletions(-) diff --git a/crates/catalog-unity/src/error.rs b/crates/catalog-unity/src/error.rs index 67610c07fe..a13c3dc401 100644 --- a/crates/catalog-unity/src/error.rs +++ b/crates/catalog-unity/src/error.rs @@ -10,7 +10,6 @@ pub enum UnityCatalogError { }, /// A generic error qualified in the message - #[error("{source}")] Retry { /// Error message @@ -19,7 +18,6 @@ pub enum UnityCatalogError { }, #[error("Request error: {source}")] - /// Error from reqwest library RequestError { /// The underlying reqwest_middleware::Error @@ -35,7 +33,6 @@ pub enum UnityCatalogError { }, /// Error caused by invalid access token value - #[error("Invalid Databricks personal access token")] InvalidAccessToken, } diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index e41e6c4397..fe27a7abf2 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -1825,7 +1825,6 @@ mod tests { use crate::storage::ObjectStoreRef; use crate::writer::test_utils::get_delta_schema; use arrow::array::StructArray; - use arrow::datatypes::DataType; use arrow::datatypes::{Field, Schema}; use arrow_array::cast::AsArray; use bytes::Bytes; diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index fe76a3647d..d756dcb157 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -1215,10 +1215,7 @@ pub(super) mod zorder { use url::Url; use ::datafusion::{ - execution::{ - memory_pool::FairSpillPool, - runtime_env::{RuntimeConfig, RuntimeEnv}, - }, + execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder}, prelude::{SessionConfig, SessionContext}, }; use arrow_schema::DataType; @@ -1245,8 +1242,9 @@ pub(super) mod zorder { let columns = columns.into(); let memory_pool = FairSpillPool::new(max_spill_size); - let config = RuntimeConfig::new().with_memory_pool(Arc::new(memory_pool)); - let runtime = Arc::new(RuntimeEnv::try_new(config)?); + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(memory_pool)) + .build_arc()?; runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store); let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime); diff --git a/crates/sql/src/parser.rs b/crates/sql/src/parser.rs index 4844c7fb2c..ccb52a3c3a 100644 --- a/crates/sql/src/parser.rs +++ b/crates/sql/src/parser.rs @@ -5,7 +5,7 @@ use datafusion_sql::parser::{DFParser, Statement as DFStatement}; use datafusion_sql::sqlparser::ast::{ObjectName, Value}; use datafusion_sql::sqlparser::dialect::{keywords::Keyword, Dialect, GenericDialect}; use datafusion_sql::sqlparser::parser::{Parser, ParserError}; -use datafusion_sql::sqlparser::tokenizer::{Token, TokenWithLocation, Tokenizer}; +use datafusion_sql::sqlparser::tokenizer::{Token, TokenWithSpan, Tokenizer}; // Use `Parser::expected` instead, if possible macro_rules! parser_err { @@ -129,7 +129,7 @@ impl<'a> DeltaParser<'a> { } /// Report an unexpected token - fn expected(&self, expected: &str, found: TokenWithLocation) -> Result { + fn expected(&self, expected: &str, found: TokenWithSpan) -> Result { parser_err!(format!("Expected {expected}, found: {found}")) } diff --git a/crates/test/src/datafusion.rs b/crates/test/src/datafusion.rs index 5ca73a742e..602c115bd6 100644 --- a/crates/test/src/datafusion.rs +++ b/crates/test/src/datafusion.rs @@ -1,18 +1,10 @@ use deltalake_core::datafusion::execution::context::SessionContext; -use deltalake_core::datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use deltalake_core::datafusion::execution::session_state::SessionStateBuilder; -use deltalake_core::datafusion::prelude::SessionConfig; use deltalake_core::delta_datafusion::DeltaTableFactory; use std::sync::Arc; pub fn context_with_delta_table_factory() -> SessionContext { - let cfg = RuntimeConfig::new(); - let env = RuntimeEnv::try_new(cfg).unwrap(); - let ses = SessionConfig::new(); - let mut state = SessionStateBuilder::new() - .with_config(ses) - .with_runtime_env(Arc::new(env)) - .build(); + let mut state = SessionStateBuilder::new().build(); state .table_factories_mut() .insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {})); From 30ef6ff66babb268b1c7b67d9068c5bd76e389c2 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Wed, 1 Jan 2025 19:56:36 +0100 Subject: [PATCH 6/6] chore: bump kernel, datafusion 44rc1 Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- Cargo.toml | 36 ++++++++------------------ crates/aws/src/lib.rs | 3 ++- crates/azure/Cargo.toml | 2 +- crates/catalog-unity/Cargo.toml | 14 +++++----- crates/catalog-unity/src/credential.rs | 12 +++------ crates/catalog-unity/src/lib.rs | 3 +-- crates/core/src/table/state_arrow.rs | 6 +++-- python/src/lib.rs | 1 + 8 files changed, 32 insertions(+), 45 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cfdb12efe8..4358d912ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "0.5.0", features = ["default-engine"] } +delta_kernel = { version = "0.6.0", features = ["default-engine"] } #delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] } # arrow @@ -45,16 +45,16 @@ object_store = { version = "0.11.2" } parquet = { version = "53" } # datafusion -datafusion = { version = "43" } -datafusion-expr = { version = "43" } -datafusion-common = { version = "43" } -datafusion-ffi = { version = "43" } -datafusion-functions = { version = "43" } -datafusion-functions-aggregate = { version = "43" } -datafusion-physical-expr = { version = "43" } -datafusion-physical-plan = { version = "43" } -datafusion-proto = { version = "43" } -datafusion-sql = { version = "43" } +datafusion = { version = "44" } +datafusion-expr = { version = "44" } +datafusion-common = { version = "44" } +datafusion-ffi = { version = "44" } +datafusion-functions = { version = "44" } +datafusion-functions-aggregate = { version = "44" } +datafusion-physical-expr = { version = "44" } +datafusion-physical-plan = { version = "44" } +datafusion-proto = { version = "44" } +datafusion-sql = { version = "44" } # serde serde = { version = "1.0.194", features = ["derive"] } @@ -69,7 +69,6 @@ thiserror = { version = "2" } url = { version = "2" } urlencoding = "2.1.3" uuid = { version = "1" } -path-tree = { version = "0.8.1"} # pin to 0.8.1 due to nightly features # runtime / async async-trait = { version = "0.1" } @@ -77,16 +76,3 @@ futures = { version = "0.3" } tokio = { version = "1" } num_cpus = { version = "1" } -# temporary datafusion patches -[patch.crates-io] -datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } -datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } -datafusion-execution = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } -datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } -datafusion-ffi = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } -datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } -datafusion-functions-aggregate = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } -datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } -datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } -datafusion-proto = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } -datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "a50ed3488f77743a192d9e8dd9c99f00df659ef1" } diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index e32aefcf8b..154b9de5bc 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -78,7 +78,7 @@ impl LogStoreFactory for S3LogStoreFactory { store, )?)); } - Ok(default_logstore(store, location, &options)) + Ok(default_logstore(store, location, options)) } } @@ -141,6 +141,7 @@ impl std::fmt::Debug for DynamoDbLockClient { impl DynamoDbLockClient { /// Creates a new DynamoDbLockClient from the supplied storage options. + #[allow(clippy::too_many_arguments)] pub fn try_new( sdk_config: &SdkConfig, lock_table_name: Option, diff --git a/crates/azure/Cargo.toml b/crates/azure/Cargo.toml index 20fd7c7e66..6a31103393 100644 --- a/crates/azure/Cargo.toml +++ b/crates/azure/Cargo.toml @@ -14,7 +14,7 @@ rust-version.workspace = true [dependencies] deltalake-core = { version = "0.23.0", path = "../core", features = [ "datafusion", -] } +]} lazy_static = "1" # workspace depenndecies diff --git a/crates/catalog-unity/Cargo.toml b/crates/catalog-unity/Cargo.toml index 4ed44d9050..670c8953ae 100644 --- a/crates/catalog-unity/Cargo.toml +++ b/crates/catalog-unity/Cargo.toml @@ -17,17 +17,19 @@ tokio.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true -deltalake-core = { version = "0.23", path = "../core" } +deltalake-core = { version = "0.23", path = "../core", features = [ + "datafusion", +]} reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] } reqwest-retry = "0.7" reqwest-middleware = "0.4.0" rand = "0.8" -futures = "0.3" -chrono = "0.4" +futures = { workspace = true } +chrono = { workspace = true } dashmap = "6" -tracing = "0.1" -datafusion = { version = "43", optional = true } -datafusion-common = { version = "43", optional = true } +tracing = { workspace = true } +datafusion = { workspace = true, optional = true } +datafusion-common = { workspace = true, optional = true } [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/crates/catalog-unity/src/credential.rs b/crates/catalog-unity/src/credential.rs index b6b21b47eb..2e0be5ea51 100644 --- a/crates/catalog-unity/src/credential.rs +++ b/crates/catalog-unity/src/credential.rs @@ -205,8 +205,7 @@ impl TokenCredential for AzureCliCredential { "got unexpected token type from azure cli: {0}", token_response.token_type ), - } - .into()); + }); } let duration = token_response.expires_on.naive_local() - chrono::Local::now().naive_local(); @@ -224,18 +223,15 @@ impl TokenCredential for AzureCliCredential { let message = String::from_utf8_lossy(&az_output.stderr); Err(UnityCatalogError::AzureCli { message: message.into(), - } - .into()) + }) } Err(e) => match e.kind() { std::io::ErrorKind::NotFound => Err(UnityCatalogError::AzureCli { message: "Azure Cli not installed".into(), - } - .into()), + }), error_kind => Err(UnityCatalogError::AzureCli { message: format!("io error: {error_kind:?}"), - } - .into()), + }), }, } } diff --git a/crates/catalog-unity/src/lib.rs b/crates/catalog-unity/src/lib.rs index f5b8d1d08a..816376eba8 100644 --- a/crates/catalog-unity/src/lib.rs +++ b/crates/catalog-unity/src/lib.rs @@ -644,8 +644,7 @@ impl DataCatalog for UnityCatalog { GetTableResponse::Error(err) => Err(UnityCatalogError::InvalidTable { error_code: err.error_code, message: err.message, - } - .into()), + }), } } } diff --git a/crates/core/src/table/state_arrow.rs b/crates/core/src/table/state_arrow.rs index 0258109859..4ea67cd9ef 100644 --- a/crates/core/src/table/state_arrow.rs +++ b/crates/core/src/table/state_arrow.rs @@ -14,7 +14,7 @@ use arrow_array::{ use arrow_cast::cast; use arrow_cast::parse::Parser; use arrow_schema::{DataType, Field, Fields, TimeUnit}; -use delta_kernel::table_features::ColumnMappingMode; +use delta_kernel::table_features::{validate_schema_column_mapping, ColumnMappingMode}; use itertools::Itertools; use super::state::DeltaTableState; @@ -171,6 +171,8 @@ impl DeltaTableState { }) .collect::>(); + validate_schema_column_mapping(self.schema(), column_mapping_mode)?; + let physical_name_to_logical_name = match column_mapping_mode { ColumnMappingMode::None => HashMap::with_capacity(0), // No column mapping, no need for this HashMap ColumnMappingMode::Id | ColumnMappingMode::Name => metadata @@ -184,7 +186,7 @@ impl DeltaTableState { "Invalid partition column {0}", name )))? - .physical_name(column_mapping_mode)? + .physical_name() .to_string(); Ok((physical_name, name.as_str())) }) diff --git a/python/src/lib.rs b/python/src/lib.rs index 0135864c7e..259b3d8a05 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -670,6 +670,7 @@ impl RawDeltaTable { } #[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, allow_out_of_range = false))] + #[allow(clippy::too_many_arguments)] pub fn load_cdf( &mut self, py: Python,