@@ -23,7 +23,8 @@ use common_util::{
23
23
time:: InstantExt ,
24
24
} ;
25
25
use datafusion:: physical_plan:: {
26
- file_format:: ParquetFileMetrics , metrics:: ExecutionPlanMetricsSet ,
26
+ file_format:: { parquet:: page_filter:: PagePruningPredicate , ParquetFileMetrics } ,
27
+ metrics:: ExecutionPlanMetricsSet ,
27
28
} ;
28
29
use futures:: { future:: BoxFuture , FutureExt , Stream , StreamExt , TryFutureExt } ;
29
30
use log:: { debug, error, info, warn} ;
@@ -211,22 +212,24 @@ impl<'a> Reader<'a> {
211
212
) ;
212
213
213
214
let mut streams = Vec :: with_capacity ( target_row_group_chunks. len ( ) ) ;
214
- let exprs =
215
- datafusion:: optimizer:: utils:: conjunction ( self . predicate . exprs ( ) . to_vec ( ) ) . unwrap ( ) ;
215
+ let exprs = datafusion:: optimizer:: utils:: conjunction ( self . predicate . exprs ( ) . to_vec ( ) ) ;
216
216
let metrics_set = ExecutionPlanMetricsSet :: new ( ) ;
217
- let metrics = ParquetFileMetrics :: new ( 1 , "abc" , & metrics_set) ;
217
+ let metrics = ParquetFileMetrics :: new ( 1 , & self . path . to_string ( ) , & metrics_set) ;
218
218
for chunk in target_row_group_chunks {
219
219
let object_store_reader =
220
220
ObjectStoreReader :: new ( self . store . clone ( ) , self . path . clone ( ) , meta_data. clone ( ) ) ;
221
- let page_predicate =
222
- datafusion:: physical_plan:: file_format:: parquet:: page_filter:: PagePruningPredicate :: try_new ( & exprs, arrow_schema. clone ( ) ) . context ( DataFusionError ) ?;
223
- let row_selection = page_predicate
224
- . prune ( & chunk, parquet_metadata, & metrics)
225
- . context ( DataFusionError ) ?;
221
+ let row_selection = if let Some ( exprs) = & exprs {
222
+ let page_predicate = PagePruningPredicate :: try_new ( & exprs, arrow_schema. clone ( ) )
223
+ . context ( DataFusionError ) ?;
224
+ page_predicate
225
+ . prune ( & chunk, parquet_metadata, & metrics)
226
+ . context ( DataFusionError ) ?
227
+ } else {
228
+ None
229
+ } ;
226
230
let mut builder = ParquetRecordBatchStreamBuilder :: new ( object_store_reader)
227
231
. await
228
232
. with_context ( || ParquetError ) ?;
229
-
230
233
if let Some ( selection) = row_selection {
231
234
builder = builder. with_row_selection ( selection) ;
232
235
} ;
0 commit comments