Skip to content

Commit aa00ef2

Browse files
jiacai2050ShiKaiWi
andauthored
feat: add record batch mem stats (#1058)
## Rationale When some metrics contains more than one subtask, it's helpful to collect them together for output. ## Detailed Changes - Update metric macro, add metric operator. ## Test Plan Update `basic` UT to test this. This is what I get in my test env: ``` # Before scan_sst_6655: project_record_batch=847.03µs project_record_batch=1.182922ms project_record_batch=1.208636ms project_record_batch=1.246589ms project_record_batch=1.296161ms project_record_batch=1.323357ms project_record_batch=1.4293ms # After scan_sst_6655: project_record_batch=10.170503ms ``` --------- Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>
1 parent 6478a83 commit aa00ef2

File tree

6 files changed

+231
-60
lines changed

6 files changed

+231
-60
lines changed

analytic_engine/src/instance/read.rs

+2
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ impl Instance {
104104
request.metrics_collector.collect(Metric::boolean(
105105
MERGE_SORT_METRIC_NAME.to_string(),
106106
need_merge_sort,
107+
None,
107108
));
108109

109110
if need_merge_sort {
@@ -216,6 +217,7 @@ impl Instance {
216217
request.metrics_collector.collect(Metric::number(
217218
ITER_NUM_METRIC_NAME.to_string(),
218219
iters.len(),
220+
None,
219221
));
220222

221223
Ok(iters)

analytic_engine/src/sst/parquet/async_reader.rs

+29-19
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,10 @@ impl<'a> Reader<'a> {
153153
.into_iter()
154154
.map(|stream| {
155155
Box::new(RecordBatchProjector::new(
156-
self.path.to_string(),
157156
stream,
158157
row_projector.clone(),
159158
sst_meta_data.clone(),
159+
self.metrics.metrics_collector.clone(),
160160
)) as _
161161
})
162162
.collect();
@@ -448,45 +448,49 @@ impl<'a> ChunkReader for ChunkReaderAdapter<'a> {
448448
}
449449
}
450450

451+
#[derive(Default, Debug, Clone, TraceMetricWhenDrop)]
452+
pub(crate) struct ProjectorMetrics {
453+
#[metric(number, sum)]
454+
pub row_num: usize,
455+
#[metric(number, sum)]
456+
pub row_mem: usize,
457+
#[metric(duration, sum)]
458+
pub project_record_batch: Duration,
459+
#[metric(collector)]
460+
pub metrics_collector: Option<MetricsCollector>,
461+
}
462+
451463
struct RecordBatchProjector {
452-
path: String,
453464
stream: SendableRecordBatchStream,
454465
row_projector: ArrowRecordBatchProjector,
455466

456-
row_num: usize,
467+
metrics: ProjectorMetrics,
457468
start_time: Instant,
458469
sst_meta: ParquetMetaDataRef,
459470
}
460471

461472
impl RecordBatchProjector {
462473
fn new(
463-
path: String,
464474
stream: SendableRecordBatchStream,
465475
row_projector: ArrowRecordBatchProjector,
466476
sst_meta: ParquetMetaDataRef,
477+
metrics_collector: Option<MetricsCollector>,
467478
) -> Self {
479+
let metrics = ProjectorMetrics {
480+
metrics_collector,
481+
..Default::default()
482+
};
483+
468484
Self {
469-
path,
470485
stream,
471486
row_projector,
472-
row_num: 0,
487+
metrics,
473488
start_time: Instant::now(),
474489
sst_meta,
475490
}
476491
}
477492
}
478493

479-
impl Drop for RecordBatchProjector {
480-
fn drop(&mut self) {
481-
debug!(
482-
"RecordBatchProjector dropped, path:{} rows:{}, cost:{}ms.",
483-
self.path,
484-
self.row_num,
485-
self.start_time.saturating_elapsed().as_millis(),
486-
);
487-
}
488-
}
489-
490494
impl Stream for RecordBatchProjector {
491495
type Item = Result<RecordBatchWithKey>;
492496

@@ -505,7 +509,10 @@ impl Stream for RecordBatchProjector {
505509
.box_err()
506510
.context(DecodeRecordBatch)?;
507511

508-
projector.row_num += record_batch.num_rows();
512+
for col in record_batch.columns() {
513+
projector.metrics.row_mem += col.get_array_memory_size();
514+
}
515+
projector.metrics.row_num += record_batch.num_rows();
509516

510517
let projected_batch = projector
511518
.row_projector
@@ -518,7 +525,10 @@ impl Stream for RecordBatchProjector {
518525
}
519526
}
520527
Poll::Pending => Poll::Pending,
521-
Poll::Ready(None) => Poll::Ready(None),
528+
Poll::Ready(None) => {
529+
projector.metrics.project_record_batch += projector.start_time.saturating_elapsed();
530+
Poll::Ready(None)
531+
}
522532
}
523533
}
524534

components/trace_metric/src/collector.rs

+40-7
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
22

3-
use std::sync::{Arc, Mutex};
3+
use std::{
4+
collections::BTreeMap,
5+
sync::{Arc, Mutex},
6+
};
47

5-
use crate::metric::Metric;
8+
use crate::metric::{Metric, MetricAggregator};
69

710
/// A collector for metrics of a single read request.
811
///
@@ -46,8 +49,36 @@ impl MetricsCollector {
4649
/// Calls a closure on each top-level metrics of this collector.
4750
pub fn for_each_metric(&self, f: &mut impl FnMut(&Metric)) {
4851
let metrics = self.metrics.lock().unwrap();
52+
53+
let mut metrics_by_name = BTreeMap::new();
4954
for metric in metrics.iter() {
50-
f(metric);
55+
metrics_by_name
56+
.entry(metric.name())
57+
.or_insert_with(Vec::new)
58+
.push(metric);
59+
}
60+
61+
for metrics in metrics_by_name.values() {
62+
if metrics.is_empty() {
63+
continue;
64+
}
65+
66+
if let Some(op) = metrics[0].aggregator() {
67+
match op {
68+
MetricAggregator::Sum => {
69+
let mut first = metrics[0].clone();
70+
for m in &metrics[1..] {
71+
first.sum(m);
72+
}
73+
// only apply fn to first metric.
74+
f(&first);
75+
}
76+
}
77+
} else {
78+
for metric in metrics {
79+
f(metric);
80+
}
81+
}
5182
}
5283
}
5384

@@ -111,23 +142,25 @@ mod tests {
111142
#[test]
112143
fn test_metrics_collector() {
113144
let collector = MetricsCollector::new("root".to_string());
114-
collector.collect(Metric::number("counter".to_string(), 1));
145+
collector.collect(Metric::number("counter".to_string(), 1, None));
115146
collector.collect(Metric::duration(
116147
"elapsed".to_string(),
117148
Duration::from_millis(100),
149+
None,
118150
));
119151
let child_1_0 = collector.span("child_1_0".to_string());
120-
child_1_0.collect(Metric::boolean("boolean".to_string(), false));
152+
child_1_0.collect(Metric::boolean("boolean".to_string(), false, None));
121153

122154
let child_2_0 = child_1_0.span("child_2_0".to_string());
123-
child_2_0.collect(Metric::number("counter".to_string(), 1));
155+
child_2_0.collect(Metric::number("counter".to_string(), 1, None));
124156
child_2_0.collect(Metric::duration(
125157
"elapsed".to_string(),
126158
Duration::from_millis(100),
159+
None,
127160
));
128161

129162
let child_1_1 = collector.span("child_1_1".to_string());
130-
child_1_1.collect(Metric::boolean("boolean".to_string(), false));
163+
child_1_1.collect(Metric::boolean("boolean".to_string(), false, None));
131164
let _child_1_2 = collector.span("child_1_2".to_string());
132165

133166
let mut visitor = FormatCollectorVisitor::default();

components/trace_metric/src/metric.rs

+56-6
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,16 @@
22

33
use std::{fmt, time::Duration};
44

5+
#[derive(Clone)]
6+
pub enum MetricAggregator {
7+
Sum,
8+
}
9+
510
#[derive(Clone)]
611
pub struct MetricValue<T: Clone + fmt::Debug> {
712
pub name: String,
813
pub val: T,
14+
pub aggregator: Option<MetricAggregator>,
915
}
1016

1117
#[derive(Clone)]
@@ -17,18 +23,62 @@ pub enum Metric {
1723

1824
impl Metric {
1925
#[inline]
20-
pub fn number(name: String, val: usize) -> Self {
21-
Metric::Number(MetricValue { name, val })
26+
pub fn number(name: String, val: usize, aggregator: Option<MetricAggregator>) -> Self {
27+
Metric::Number(MetricValue {
28+
name,
29+
val,
30+
aggregator,
31+
})
2232
}
2333

2434
#[inline]
25-
pub fn duration(name: String, val: Duration) -> Self {
26-
Metric::Duration(MetricValue { name, val })
35+
pub fn duration(name: String, val: Duration, aggregator: Option<MetricAggregator>) -> Self {
36+
Metric::Duration(MetricValue {
37+
name,
38+
val,
39+
aggregator,
40+
})
2741
}
2842

2943
#[inline]
30-
pub fn boolean(name: String, val: bool) -> Self {
31-
Metric::Boolean(MetricValue { name, val })
44+
pub fn boolean(name: String, val: bool, aggregator: Option<MetricAggregator>) -> Self {
45+
Metric::Boolean(MetricValue {
46+
name,
47+
val,
48+
aggregator,
49+
})
50+
}
51+
52+
#[inline]
53+
pub fn name(&self) -> &str {
54+
match self {
55+
Self::Boolean(v) => &v.name,
56+
Self::Number(v) => &v.name,
57+
Self::Duration(v) => &v.name,
58+
}
59+
}
60+
61+
#[inline]
62+
pub fn aggregator(&self) -> &Option<MetricAggregator> {
63+
match self {
64+
Self::Boolean(v) => &v.aggregator,
65+
Self::Number(v) => &v.aggregator,
66+
Self::Duration(v) => &v.aggregator,
67+
}
68+
}
69+
70+
// Sum metric values together when metrics are same type,
71+
// Panic if their types are different.
72+
#[inline]
73+
pub fn sum(&mut self, rhs: &Self) {
74+
match (self, rhs) {
75+
(Self::Boolean(lhs), Self::Boolean(rhs)) => lhs.val |= rhs.val,
76+
(Self::Number(lhs), Self::Number(rhs)) => lhs.val += rhs.val,
77+
(Self::Duration(lhs), Self::Duration(rhs)) => lhs.val += rhs.val,
78+
(lhs, rhs) => {
79+
panic!("Only same type metric could be applied, lhs:{lhs:?}, rhs:{rhs:?}")
80+
}
81+
}
3282
}
3383
}
3484

0 commit comments

Comments
 (0)