Skip to content

Commit 8365129

Browse files
committed
collect metrics during merge
1 parent d2fdece commit 8365129

File tree

7 files changed

+105
-50
lines changed

7 files changed

+105
-50
lines changed

analytic_engine/src/instance/flush_compaction.rs

+1
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,7 @@ impl SpaceStore {
855855
let sequence = table_data.last_sequence();
856856
let mut builder = MergeBuilder::new(MergeConfig {
857857
request_id,
858+
metrics_collector: None,
858859
// no need to set deadline for compaction
859860
deadline: None,
860861
space_id,

analytic_engine/src/instance/read.rs

+21-8
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use table_engine::{
2020
stream::{
2121
self, ErrWithSource, PartitionedStreams, RecordBatchStream, SendableRecordBatchStream,
2222
},
23-
table::ReadRequest,
23+
table::{Metric, ReadRequest},
2424
};
2525
use tokio::sync::mpsc::{self, Receiver};
2626

@@ -66,6 +66,8 @@ pub enum Error {
6666
define_result!(Error);
6767

6868
const RECORD_BATCH_READ_BUF_SIZE: usize = 1000;
69+
const READ_METRIC_MERGE_SORT: &str = "do_merge_sort";
70+
const READ_METRIC_ITER_NUM: &str = "iter_num";
6971

7072
/// Check whether it needs to apply merge sorting when reading the table with
7173
/// the `table_options` by the `read_request`.
@@ -91,13 +93,18 @@ impl Instance {
9193

9294
let table_data = space_table.table_data();
9395

94-
// Collect metrics.
95-
table_data.metrics.on_read_request_begin();
96-
9796
let iter_options = self.iter_options.clone();
9897
let table_options = table_data.table_options();
9998

100-
if need_merge_sort_streams(&table_data.table_options(), &request) {
99+
// Collect metrics.
100+
table_data.metrics.on_read_request_begin();
101+
let need_merge_sort = need_merge_sort_streams(&table_options, &request);
102+
request.metrics_collector.collect(Metric::boolean(
103+
READ_METRIC_MERGE_SORT.to_string(),
104+
need_merge_sort,
105+
));
106+
107+
if need_merge_sort {
101108
let merge_iters = self
102109
.build_merge_iters(table_data, &request, iter_options, &table_options)
103110
.await?;
@@ -123,16 +130,16 @@ impl Instance {
123130
};
124131

125132
// Split iterators into `read_parallelism` groups.
126-
let mut splited_iters: Vec<_> = std::iter::repeat_with(Vec::new)
133+
let mut splitted_iters: Vec<_> = std::iter::repeat_with(Vec::new)
127134
.take(read_parallelism)
128135
.collect();
129136

130137
for (i, time_aligned_iter) in partitioned_iters.into_iter().enumerate() {
131-
splited_iters[i % read_parallelism].push(time_aligned_iter);
138+
splitted_iters[i % read_parallelism].push(time_aligned_iter);
132139
}
133140

134141
let mut streams = Vec::with_capacity(read_parallelism);
135-
for iters in splited_iters {
142+
for iters in splitted_iters {
136143
let stream = iters_to_stream(iters, self.read_runtime(), &request.projected_schema);
137144
streams.push(stream);
138145
}
@@ -172,6 +179,7 @@ impl Instance {
172179
for read_view in read_views {
173180
let merge_config = MergeConfig {
174181
request_id: request.request_id,
182+
metrics_collector: Some(request.metrics_collector.clone()),
175183
deadline: request.opts.deadline,
176184
space_id: table_data.space_id,
177185
table_id: table_data.id,
@@ -201,6 +209,11 @@ impl Instance {
201209
iters.push(dedup_iter);
202210
}
203211

212+
request.metrics_collector.collect(Metric::counter(
213+
READ_METRIC_ITER_NUM.to_string(),
214+
iters.len(),
215+
));
216+
204217
Ok(iters)
205218
}
206219

analytic_engine/src/row_iter/merge.rs

+49-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ use common_util::{define_result, error::GenericError};
2222
use futures::{future::try_join_all, StreamExt};
2323
use log::{debug, info, trace};
2424
use snafu::{ensure, Backtrace, ResultExt, Snafu};
25-
use table_engine::{predicate::PredicateRef, table::TableId};
25+
use table_engine::{
26+
predicate::PredicateRef,
27+
table::{Metric, ReadMetricsCollector, TableId},
28+
};
2629

2730
use crate::{
2831
row_iter::{
@@ -83,6 +86,7 @@ define_result!(Error);
8386
#[derive(Debug)]
8487
pub struct MergeConfig<'a> {
8588
pub request_id: RequestId,
89+
pub metrics_collector: Option<ReadMetricsCollector>,
8690
/// None for background jobs, such as: compaction
8791
pub deadline: Option<Instant>,
8892
pub space_id: SpaceId,
@@ -228,6 +232,7 @@ impl<'a> MergeBuilder<'a> {
228232
self.config.merge_iter_options,
229233
self.config.reverse,
230234
Metrics::new(self.memtables.len(), sst_streams_num, sst_ids),
235+
self.config.metrics_collector,
231236
))
232237
}
233238
}
@@ -556,14 +561,15 @@ impl Ord for HeapBufferedStream {
556561
}
557562
}
558563

564+
/// Metrics for merge iterator.
559565
pub struct Metrics {
560566
num_memtables: usize,
561567
num_ssts: usize,
562568
sst_ids: Vec<FileId>,
563-
/// Times to fetch rows from one stream.
564-
times_fetch_rows_from_one: usize,
565569
/// Total rows collected using fetch_rows_from_one_stream().
566570
total_rows_fetch_from_one: usize,
571+
/// Times to fetch rows from one stream.
572+
times_fetch_rows_from_one: usize,
567573
/// Times to fetch one row from multiple stream.
568574
times_fetch_row_from_multiple: usize,
569575
/// Create time of the metrics.
@@ -591,6 +597,37 @@ impl Metrics {
591597
scan_count: 0,
592598
}
593599
}
600+
601+
fn collect(&self, collector: &ReadMetricsCollector) {
602+
// TODO: maybe we can define a macro to generate the code.
603+
collector.collect(Metric::counter(
604+
"num_memtables".to_string(),
605+
self.num_memtables,
606+
));
607+
608+
collector.collect(Metric::counter("num_ssts".to_string(), self.num_ssts));
609+
collector.collect(Metric::counter(
610+
"times_fetch_rows_from_one".to_string(),
611+
self.times_fetch_rows_from_one,
612+
));
613+
collector.collect(Metric::counter(
614+
"times_rows_fetch_from_one".to_string(),
615+
self.times_fetch_row_from_multiple,
616+
));
617+
collector.collect(Metric::counter(
618+
"total_rows_fetch_from_one".to_string(),
619+
self.total_rows_fetch_from_one,
620+
));
621+
collector.collect(Metric::elapsed(
622+
"init_duration".to_string(),
623+
self.init_duration,
624+
));
625+
collector.collect(Metric::elapsed(
626+
"scan_duration".to_string(),
627+
self.scan_duration,
628+
));
629+
collector.collect(Metric::counter("scan_count".to_string(), self.scan_count));
630+
}
594631
}
595632

596633
impl fmt::Debug for Metrics {
@@ -630,6 +667,7 @@ pub struct MergeIterator {
630667
iter_options: IterOptions,
631668
reverse: bool,
632669
metrics: Metrics,
670+
metrics_collector: Option<ReadMetricsCollector>,
633671
}
634672

635673
impl MergeIterator {
@@ -643,6 +681,7 @@ impl MergeIterator {
643681
iter_options: IterOptions,
644682
reverse: bool,
645683
metrics: Metrics,
684+
metrics_collector: Option<ReadMetricsCollector>,
646685
) -> Self {
647686
let heap_cap = streams.len();
648687
let record_batch_builder =
@@ -660,6 +699,7 @@ impl MergeIterator {
660699
iter_options,
661700
reverse,
662701
metrics,
702+
metrics_collector,
663703
}
664704
}
665705

@@ -855,6 +895,10 @@ impl MergeIterator {
855895

856896
impl Drop for MergeIterator {
857897
fn drop(&mut self) {
898+
if let Some(collector) = &self.metrics_collector {
899+
self.metrics.collect(collector);
900+
}
901+
858902
info!(
859903
"Merge iterator dropped, table_id:{:?}, request_id:{}, metrics:{:?}, iter_options:{:?},",
860904
self.table_id, self.request_id, self.metrics, self.iter_options,
@@ -925,6 +969,7 @@ mod tests {
925969
IterOptions::default(),
926970
false,
927971
Metrics::new(1, 1, vec![]),
972+
None,
928973
);
929974

930975
check_iterator(
@@ -978,6 +1023,7 @@ mod tests {
9781023
IterOptions::default(),
9791024
true,
9801025
Metrics::new(1, 1, vec![]),
1026+
None,
9811027
);
9821028

9831029
check_iterator(

benchmarks/src/merge_memtable_bench.rs

+1
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ impl MergeMemTableBench {
142142
let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone());
143143
let mut builder = MergeBuilder::new(MergeConfig {
144144
request_id,
145+
metrics_collector: None,
145146
deadline: None,
146147
space_id,
147148
table_id,

benchmarks/src/merge_sst_bench.rs

+1
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ impl MergeSstBench {
125125
let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone());
126126
let mut builder = MergeBuilder::new(MergeConfig {
127127
request_id,
128+
metrics_collector: None,
128129
deadline: None,
129130
space_id,
130131
table_id,

benchmarks/src/sst_tools.rs

+1
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc<Runtime>) {
220220

221221
let mut builder = MergeBuilder::new(MergeConfig {
222222
request_id,
223+
metrics_collector: None,
223224
deadline: None,
224225
space_id,
225226
table_id,

table_engine/src/table.rs

+31-39
Original file line numberDiff line numberDiff line change
@@ -382,71 +382,63 @@ impl ReadOrder {
382382
pub struct MetricValue<T: Clone + fmt::Debug> {
383383
pub name: String,
384384
pub val: T,
385-
pub partition: Option<usize>,
386385
}
387386

388387
#[derive(Clone, Debug)]
389388
pub enum Metric {
389+
Boolean(MetricValue<bool>),
390390
Counter(MetricValue<usize>),
391391
Elapsed(MetricValue<Duration>),
392392
}
393393

394394
impl Metric {
395395
#[inline]
396-
pub fn counter(name: String, val: usize, partition: Option<usize>) -> Self {
397-
Metric::Counter(MetricValue {
398-
name,
399-
val,
400-
partition,
401-
})
396+
pub fn counter(name: String, val: usize) -> Self {
397+
Metric::Counter(MetricValue { name, val })
402398
}
403399

404400
#[inline]
405-
pub fn elapsed(name: String, val: Duration, partition: Option<usize>) -> Self {
406-
Metric::Elapsed(MetricValue {
407-
name,
408-
val,
409-
partition,
410-
})
401+
pub fn elapsed(name: String, val: Duration) -> Self {
402+
Metric::Elapsed(MetricValue { name, val })
403+
}
404+
405+
#[inline]
406+
pub fn boolean(name: String, val: bool) -> Self {
407+
Metric::Boolean(MetricValue { name, val })
411408
}
412409
}
413410

414411
impl From<Metric> for DfMetric {
415412
fn from(metric: Metric) -> Self {
416-
let (df_metric_val, partition) = match metric {
417-
Metric::Counter(MetricValue {
418-
name,
419-
val,
420-
partition,
421-
}) => {
413+
let df_metric_val = match metric {
414+
Metric::Counter(MetricValue { name, val }) => {
422415
let count = Count::new();
423416
count.add(val);
424-
(
425-
DfMetricValue::Count {
426-
name: name.into(),
427-
count,
428-
},
429-
partition,
430-
)
417+
DfMetricValue::Count {
418+
name: name.into(),
419+
count,
420+
}
431421
}
432-
Metric::Elapsed(MetricValue {
433-
name,
434-
val,
435-
partition,
436-
}) => {
422+
Metric::Elapsed(MetricValue { name, val }) => {
437423
let time = Time::new();
438424
time.add_duration(val);
439-
(
440-
DfMetricValue::Time {
441-
name: name.into(),
442-
time,
443-
},
444-
partition,
445-
)
425+
DfMetricValue::Time {
426+
name: name.into(),
427+
time,
428+
}
429+
}
430+
Metric::Boolean(MetricValue { name, val }) => {
431+
let count = Count::new();
432+
// Use 0 for false, 1 for true.
433+
count.add(val as usize);
434+
DfMetricValue::Count {
435+
name: name.into(),
436+
count,
437+
}
446438
}
447439
};
448440

449-
DfMetric::new(df_metric_val, partition)
441+
DfMetric::new(df_metric_val, None)
450442
}
451443
}
452444

0 commit comments

Comments
 (0)