@@ -30,14 +30,17 @@ use datafusion::{
30
30
metrics:: ExecutionPlanMetricsSet ,
31
31
} ,
32
32
} ;
33
- use futures:: { Stream , StreamExt } ;
33
+ use futures:: { future :: BoxFuture , FutureExt , Stream , StreamExt , TryFutureExt } ;
34
34
use log:: { debug, error} ;
35
35
use object_store:: { ObjectStoreRef , Path } ;
36
36
use parquet:: {
37
- arrow:: { arrow_reader:: RowSelection , ParquetRecordBatchStreamBuilder , ProjectionMask } ,
37
+ arrow:: {
38
+ arrow_reader:: RowSelection , async_reader:: AsyncFileReader , ParquetRecordBatchStreamBuilder ,
39
+ ProjectionMask ,
40
+ } ,
38
41
file:: metadata:: RowGroupMetaData ,
39
42
} ;
40
- use parquet_ext:: { meta_data:: ChunkReader , reader :: ObjectStoreReader } ;
43
+ use parquet_ext:: meta_data:: ChunkReader ;
41
44
use snafu:: ResultExt ;
42
45
use table_engine:: predicate:: PredicateRef ;
43
46
use tokio:: sync:: mpsc:: { self , Receiver , Sender } ;
@@ -278,23 +281,13 @@ impl<'a> Reader<'a> {
278
281
279
282
let mut streams = Vec :: with_capacity ( target_row_group_chunks. len ( ) ) ;
280
283
for chunk in target_row_group_chunks {
281
- let object_store_reader = ObjectStoreReader :: new (
282
- self . store . clone ( ) ,
283
- self . path . clone ( ) ,
284
- parquet_metadata. clone ( ) ,
285
- ) ;
284
+ let object_store_reader =
285
+ ObjectStoreReader :: new ( self . store . clone ( ) , self . path . clone ( ) , meta_data. clone ( ) ) ;
286
286
let mut builder = ParquetRecordBatchStreamBuilder :: new ( object_store_reader)
287
287
. await
288
288
. with_context ( || ParquetError ) ?;
289
-
290
289
let row_selection =
291
290
self . build_row_selection ( arrow_schema. clone ( ) , & chunk, parquet_metadata) ?;
292
-
293
- debug ! (
294
- "Build row selection for file path:{}, result:{row_selection:?}, page indexes:{}" ,
295
- self . path,
296
- parquet_metadata. page_indexes( ) . is_some( )
297
- ) ;
298
291
if let Some ( selection) = row_selection {
299
292
builder = builder. with_row_selection ( selection) ;
300
293
} ;
@@ -347,32 +340,18 @@ impl<'a> Reader<'a> {
347
340
Ok ( file_size)
348
341
}
349
342
350
- async fn load_meta_data_from_storage ( & self , ignore_sst_filter : bool ) -> Result < MetaData > {
343
+ async fn load_meta_data_from_storage ( & self ) -> Result < parquet_ext :: ParquetMetaData > {
351
344
let file_size = self . load_file_size ( ) . await ?;
352
345
let chunk_reader_adapter = ChunkReaderAdapter :: new ( self . path , self . store ) ;
353
346
354
- let ( parquet_meta_data , _) =
347
+ let ( meta_data , _) =
355
348
parquet_ext:: meta_data:: fetch_parquet_metadata ( file_size, & chunk_reader_adapter)
356
349
. await
357
350
. with_context ( || FetchAndDecodeSstMeta {
358
351
file_path : self . path . to_string ( ) ,
359
352
} ) ?;
360
353
361
- let object_store_reader = parquet_ext:: reader:: ObjectStoreReader :: new (
362
- self . store . clone ( ) ,
363
- self . path . clone ( ) ,
364
- Arc :: new ( parquet_meta_data) ,
365
- ) ;
366
-
367
- let parquet_meta_data = parquet_ext:: meta_data:: meta_with_page_indexes ( object_store_reader)
368
- . await
369
- . with_context ( || DecodePageIndexes {
370
- file_path : self . path . to_string ( ) ,
371
- } ) ?;
372
-
373
- MetaData :: try_new ( & parquet_meta_data, ignore_sst_filter)
374
- . box_err ( )
375
- . context ( DecodeSstMeta )
354
+ Ok ( meta_data)
376
355
}
377
356
378
357
fn need_update_cache ( & self ) -> bool {
@@ -396,8 +375,12 @@ impl<'a> Reader<'a> {
396
375
let empty_predicate = self . predicate . exprs ( ) . is_empty ( ) ;
397
376
398
377
let meta_data = {
378
+ let parquet_meta_data = self . load_meta_data_from_storage ( ) . await ?;
379
+
399
380
let ignore_sst_filter = avoid_update_cache && empty_predicate;
400
- self . load_meta_data_from_storage ( ignore_sst_filter) . await ?
381
+ MetaData :: try_new ( & parquet_meta_data, ignore_sst_filter)
382
+ . box_err ( )
383
+ . context ( DecodeSstMeta ) ?
401
384
} ;
402
385
403
386
if avoid_update_cache || self . meta_cache . is_none ( ) {
@@ -430,6 +413,71 @@ impl<'a> Drop for Reader<'a> {
430
413
}
431
414
}
432
415
416
+ #[ derive( Clone ) ]
417
+ struct ObjectStoreReader {
418
+ storage : ObjectStoreRef ,
419
+ path : Path ,
420
+ meta_data : MetaData ,
421
+ begin : Instant ,
422
+ }
423
+
424
+ impl ObjectStoreReader {
425
+ fn new ( storage : ObjectStoreRef , path : Path , meta_data : MetaData ) -> Self {
426
+ Self {
427
+ storage,
428
+ path,
429
+ meta_data,
430
+ begin : Instant :: now ( ) ,
431
+ }
432
+ }
433
+ }
434
+
435
+ impl Drop for ObjectStoreReader {
436
+ fn drop ( & mut self ) {
437
+ debug ! (
438
+ "ObjectStoreReader dropped, path:{}, elapsed:{:?}" ,
439
+ & self . path,
440
+ self . begin. elapsed( )
441
+ ) ;
442
+ }
443
+ }
444
+
445
+ impl AsyncFileReader for ObjectStoreReader {
446
+ fn get_bytes ( & mut self , range : Range < usize > ) -> BoxFuture < ' _ , parquet:: errors:: Result < Bytes > > {
447
+ self . storage
448
+ . get_range ( & self . path , range)
449
+ . map_err ( |e| {
450
+ parquet:: errors:: ParquetError :: General ( format ! (
451
+ "Failed to fetch range from object store, err:{e}"
452
+ ) )
453
+ } )
454
+ . boxed ( )
455
+ }
456
+
457
+ fn get_byte_ranges (
458
+ & mut self ,
459
+ ranges : Vec < Range < usize > > ,
460
+ ) -> BoxFuture < ' _ , parquet:: errors:: Result < Vec < Bytes > > > {
461
+ async move {
462
+ self . storage
463
+ . get_ranges ( & self . path , & ranges)
464
+ . map_err ( |e| {
465
+ parquet:: errors:: ParquetError :: General ( format ! (
466
+ "Failed to fetch ranges from object store, err:{e}"
467
+ ) )
468
+ } )
469
+ . await
470
+ }
471
+ . boxed ( )
472
+ }
473
+
474
+ fn get_metadata (
475
+ & mut self ,
476
+ ) -> BoxFuture < ' _ , parquet:: errors:: Result < Arc < parquet:: file:: metadata:: ParquetMetaData > > > {
477
+ Box :: pin ( async move { Ok ( self . meta_data . parquet ( ) . clone ( ) ) } )
478
+ }
479
+ }
480
+
433
481
pub struct ChunkReaderAdapter < ' a > {
434
482
path : & ' a Path ,
435
483
store : & ' a ObjectStoreRef ,
0 commit comments