diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index f58c1da852..6ed297a027 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -309,8 +309,12 @@ impl<'a> Reader<'a> { ); let mut streams = Vec::with_capacity(target_row_group_chunks.len()); - let metrics_collector = ObjectStoreMetricsObserver { - table_level_sst_metrics: self.table_level_sst_metrics.clone(), + let metrics_collector = { + let metrics_for_object_store = matches!(self.frequency, ReadFrequency::Frequent) + .then(|| self.table_level_sst_metrics.clone()); + ObjectStoreMetricsObserver { + table_level_sst_metrics: metrics_for_object_store, + } }; for chunk in target_row_group_chunks { let object_store_reader = ObjectStoreReader::with_metrics( @@ -763,18 +767,20 @@ impl<'a> SstReader for ThreadedReader<'a> { #[derive(Clone)] struct ObjectStoreMetricsObserver { - table_level_sst_metrics: Arc, + table_level_sst_metrics: Option>, } impl MetricsObserver for ObjectStoreMetricsObserver { fn elapsed(&self, path: &Path, elapsed: Duration) { - debug!("ObjectStoreReader dropped, path:{path}, elapsed:{elapsed:?}",); + debug!("ObjectStoreReader dropped, path:{path}, elapsed:{elapsed:?}"); } fn num_bytes_fetched(&self, _: &Path, num_bytes: usize) { - self.table_level_sst_metrics - .num_fetched_sst_bytes - .fetch_add(num_bytes as u64, Ordering::Relaxed); + if let Some(metrics) = &self.table_level_sst_metrics { + metrics + .num_fetched_sst_bytes + .fetch_add(num_bytes as u64, Ordering::Relaxed); + } } }