From 996b630cc2c9a040fd341d83d138f9c9dd837d34 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Sun, 24 Mar 2024 10:31:39 +0100 Subject: [PATCH 1/5] Add support for Bloom filters on unsigned integer columns in Parquet tables --- .../core/src/datasource/physical_plan/parquet/row_groups.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index a82c5d97a2b7..8df4925fc566 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -234,6 +234,10 @@ impl PruningStatistics for BloomFilterStatistics { ScalarValue::Int32(Some(v)) => sbbf.check(v), ScalarValue::Int16(Some(v)) => sbbf.check(v), ScalarValue::Int8(Some(v)) => sbbf.check(v), + ScalarValue::UInt64(Some(v)) => sbbf.check(v), + ScalarValue::UInt32(Some(v)) => sbbf.check(v), + ScalarValue::UInt16(Some(v)) => sbbf.check(v), + ScalarValue::UInt8(Some(v)) => sbbf.check(v), ScalarValue::Decimal128(Some(v), p, s) => match parquet_type { Type::INT32 => { //https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42 From eea33a292e7ae53756726a0985bbce9bfb48c7e8 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Sun, 31 Mar 2024 12:27:31 +0200 Subject: [PATCH 2/5] Add Scenario::UInt --- datafusion/core/tests/parquet/mod.rs | 43 ++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 368637d024e6..1da86a0363a5 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -22,7 +22,7 @@ use arrow::{ Array, ArrayRef, BinaryArray, Date32Array, Date64Array, FixedSizeBinaryArray, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, + TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, @@ -65,6 +65,7 @@ enum Scenario { Dates, Int, Int32Range, + UInt, Float64, Decimal, DecimalBloomFilterInt32, @@ -387,7 +388,7 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch { .unwrap() } -/// Return record batch with i32 sequence +/// Return record batch with i8, i16, i32, and i64 sequences /// /// Columns are named /// "i8" -> Int8Array @@ -417,6 +418,36 @@ fn make_int_batches(start: i8, end: i8) -> RecordBatch { .unwrap() } +/// Return record batch with i8, i16, i32, and i64 sequences +/// +/// Columns are named +/// "u8" -> UInt8Array +/// "u16" -> UInt16Array +/// "u32" -> UInt32Array +/// "u64" -> UInt64Array +fn make_uint_batches(start: u8, end: u8) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("u8", DataType::UInt8, true), + Field::new("u16", DataType::UInt16, true), + Field::new("u32", DataType::UInt32, true), + Field::new("u64", DataType::UInt64, true), + ])); + let v8: Vec = (start..end).collect(); + let v16: Vec = (start as _..end as _).collect(); + let v32: Vec = (start as _..end as _).collect(); + let v64: Vec = (start as _..end as _).collect(); + RecordBatch::try_new( + schema, + vec![ + Arc::new(UInt8Array::from(v8)) as ArrayRef, + Arc::new(UInt16Array::from(v16)) as ArrayRef, + Arc::new(UInt32Array::from(v32)) as ArrayRef, + Arc::new(UInt64Array::from(v64)) as ArrayRef, + ], + ) + .unwrap() +} + fn make_int32_range(start: i32, end: i32) -> RecordBatch { let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)])); let v = vec![start, end]; @@ -620,6 +651,14 @@ fn create_data_batch(scenario: Scenario) -> Vec { Scenario::Int32Range => { vec![make_int32_range(0, 10), make_int32_range(200000, 300000)] } + Scenario::UInt => { + vec![ + make_uint_batches(0, 5), + make_uint_batches(1, 6), + make_uint_batches(5, 10), + make_uint_batches(250, 255), + ] + } Scenario::Float64 => { vec![ make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]), From 709c6c3edee837c545a8aa9a1661ecce29ba70de Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Sun, 31 Mar 2024 12:40:31 +0200 Subject: [PATCH 3/5] Add tests for Bloom filters on unsigned integer columns in Parquet tables --- .../core/tests/parquet/row_group_pruning.rs | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index b70102f78a96..1f212a212b69 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -452,6 +452,144 @@ int_tests!(16, correct_bloom_filters: false); int_tests!(32, correct_bloom_filters: true); int_tests!(64, correct_bloom_filters: true); +// $bits: number of bits of the integer to test (8, 16, 32, 64) +// $correct_bloom_filters: if false, replicates the +// https://github.com/apache/arrow-datafusion/issues/9779 bug so that tests pass +// if and only if Bloom filters on UInt8 and UInt16 columns are still buggy. +macro_rules! uint_tests { + ($bits:expr, correct_bloom_filters: $correct_bloom_filters:expr) => { + paste::item! { + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} < 6", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(3)) + .with_pruned_by_stats(Some(1)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(11) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} = 6", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(3)) + .with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 })) + .with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 })) + .with_expected_rows(if $correct_bloom_filters { 1 } else { 0 }) + .test_row_group_prune() + .await; + } + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} = 6", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(3)) + .with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 })) + .with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 })) + .with_expected_rows(if $correct_bloom_filters { 1 } else { 0 }) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where power(u{}, 2) = 25", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(2) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{}+1 = 6", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(2) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + // result of sql "SELECT * FROM t where in (1)" + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} in (6)", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(3)) + .with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 })) + .with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 })) + .with_expected_rows(if $correct_bloom_filters { 1 } else { 0 }) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + // result of sql "SELECT * FROM t where in (1000)", prune all + // test whether statistics works + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} in (100)", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(4)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(0) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + // result of sql "SELECT * FROM t where not in (1)" prune nothing + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} not in (6)", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(4)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(4)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(19) + .test_row_group_prune() + .await; + } + } + }; +} + +uint_tests!(8, correct_bloom_filters: false); +uint_tests!(16, correct_bloom_filters: false); +uint_tests!(32, correct_bloom_filters: true); +uint_tests!(64, correct_bloom_filters: true); + #[tokio::test] async fn prune_int32_eq_large_in_list() { // result of sql "SELECT * FROM t where i in (2050...2582)", prune all From 96ff33407fad6e0677c84b093bb766e8aaf2de20 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Sun, 31 Mar 2024 12:50:03 +0200 Subject: [PATCH 4/5] Fix _scalar_fun_and_eq to actually call a function --- datafusion/core/tests/parquet/row_group_pruning.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 1f212a212b69..81a5e6c1276a 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -339,7 +339,7 @@ macro_rules! int_tests { async fn []() { RowGroupPruningTest::new() .with_scenario(Scenario::Int) - .with_query(&format!("SELECT * FROM t where i{} = 1", $bits)) + .with_query(&format!("SELECT * FROM t where abs(i{}) = 1 and i{} = 1", $bits, $bits)) .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(3)) @@ -492,7 +492,7 @@ macro_rules! uint_tests { async fn []() { RowGroupPruningTest::new() .with_scenario(Scenario::UInt) - .with_query(&format!("SELECT * FROM t where u{} = 6", $bits)) + .with_query(&format!("SELECT * FROM t where power(u{}, 2) = 36 and u{} = 6", $bits, $bits)) .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(3)) From c1f512c948732e50ccefe73edf150fac29e4a0d1 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Sun, 31 Mar 2024 13:00:49 +0200 Subject: [PATCH 5/5] Add prune_uint32_eq_large_in_list --- datafusion/core/tests/parquet/mod.rs | 11 ++++++++++ .../core/tests/parquet/row_group_pruning.rs | 22 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 1da86a0363a5..b4415d638ada 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -66,6 +66,7 @@ enum Scenario { Int, Int32Range, UInt, + UInt32Range, Float64, Decimal, DecimalBloomFilterInt32, @@ -455,6 +456,13 @@ fn make_int32_range(start: i32, end: i32) -> RecordBatch { RecordBatch::try_new(schema, vec![array.clone()]).unwrap() } +fn make_uint32_range(start: u32, end: u32) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("u", DataType::UInt32, true)])); + let v = vec![start, end]; + let array = Arc::new(UInt32Array::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array.clone()]).unwrap() +} + /// Return record batch with f64 vector /// /// Columns are named @@ -659,6 +667,9 @@ fn create_data_batch(scenario: Scenario) -> Vec { make_uint_batches(250, 255), ] } + Scenario::UInt32Range => { + vec![make_uint32_range(0, 10), make_uint32_range(200000, 300000)] + } Scenario::Float64 => { vec![ make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]), diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 81a5e6c1276a..b7b434d1c3d3 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -612,6 +612,28 @@ async fn prune_int32_eq_large_in_list() { .await; } +#[tokio::test] +async fn prune_uint32_eq_large_in_list() { + // result of sql "SELECT * FROM t where i in (2050...2582)", prune all + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt32Range) + .with_query( + format!( + "SELECT * FROM t where u in ({})", + (200050..200082).join(",") + ) + .as_str(), + ) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(1)) + .with_expected_rows(0) + .test_row_group_prune() + .await; +} + #[tokio::test] async fn prune_f64_lt() { RowGroupPruningTest::new()