Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multicolumn parquet predicate pushdown (#4046) #4048

Merged
merged 2 commits into from
Nov 1, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 48 additions & 7 deletions datafusion/core/src/physical_plan/file_format/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,7 @@ impl DatafusionArrowPredicate {
// on the order they appear in the file
let projection = match candidate.projection.len() {
0 | 1 => vec![],
len => {
let mut projection: Vec<_> = (0..len).collect();
projection.sort_unstable_by_key(|x| candidate.projection[*x]);
projection
}
_ => remap_projection(&candidate.projection),
};

Ok(Self {
Expand Down Expand Up @@ -278,6 +274,32 @@ impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
}
}

/// Computes the projection required to go from the file's schema order to the projected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for these comments

/// order expected by this filter
///
/// Effectively this computes the rank of each element in `src`
fn remap_projection(src: &[usize]) -> Vec<usize> {
let len = src.len();

// Compute the column mapping from projected order to file order
// i.e. the indices required to sort projected schema into the file schema
//
// e.g. projection: [5, 9, 0] -> [2, 0, 1]
let mut sorted_indexes: Vec<_> = (0..len).collect();
sorted_indexes.sort_unstable_by_key(|x| src[*x]);

// Compute the mapping from schema order to projected order
// i.e. the indices required to sort file schema into the projected schema
//
// Above we computed the order of the projected schema according to the file
// schema, and so we can use this as the comparator
//
// e.g. sorted_indexes [2, 0, 1] -> [1, 2, 0]
let mut projection: Vec<_> = (0..len).collect();
projection.sort_unstable_by_key(|x| sorted_indexes[*x]);
projection
}

/// Calculate the total compressed size of all `Column's required for
/// predicate `Expr`. This should represent the total amount of file IO
/// required to evaluate the predicate.
Expand Down Expand Up @@ -382,12 +404,13 @@ pub fn build_row_filter(

#[cfg(test)]
mod test {
use super::*;
use crate::physical_plan::file_format::row_filter::FilterCandidateBuilder;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::ScalarValue;
use arrow::datatypes::Field;
use datafusion_expr::{cast, col, lit};
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::reader::{FileReader, SerializedFileReader};
use rand::prelude::*;

// Assume a column expression for a column not in the table schema is a projected column and ignore it
#[test]
Expand Down Expand Up @@ -471,4 +494,22 @@ mod test {

assert_eq!(candidate.unwrap().expr, expected_candidate_expr);
}

#[test]
fn test_remap_projection() {
let mut rng = thread_rng();
for _ in 0..100 {
// A random selection of column indexes in arbitrary order
let projection: Vec<_> = (0..100).map(|_| rng.gen()).collect();

// File order is the projection sorted
let mut file_order = projection.clone();
file_order.sort_unstable();

let remap = remap_projection(&projection);
// Applying the remapped projection to the file order should yield the original
let remapped: Vec<_> = remap.iter().map(|r| file_order[*r]).collect();
assert_eq!(projection, remapped)
}
}
}