Skip to content

Commit aac76c6

Browse files
authored
feat: ignore row group filter when certain column type (#958)
## Rationale Part of #955 ## Detailed Changes - Ignore filter for null, double, float, varbinary, boolean - Fix parse meta in `meta_from_sst` ## Test Plan - Before: max_seq:132734153, size:348.675M, metadata:40.142M, kv:38.175M, filter:28.525M, row_num:7038949 - After: max_seq:132734153, size:334.045M, metadata:25.512M, kv:23.545M, filter:17.562M, row_num:7038949
1 parent 20e3a05 commit aac76c6

File tree

4 files changed

+59
-12
lines changed

4 files changed

+59
-12
lines changed

analytic_engine/src/sst/parquet/meta_data.rs

+46-9
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@ use std::{fmt, ops::Index, sync::Arc};
66

77
use bytes::Bytes;
88
use ceresdbproto::{schema as schema_pb, sst as sst_pb};
9-
use common_types::{schema::Schema, time::TimeRange, SequenceNumber};
9+
use common_types::{
10+
datum::DatumKind,
11+
schema::{RecordSchemaWithKey, Schema},
12+
time::TimeRange,
13+
SequenceNumber,
14+
};
1015
use common_util::define_result;
1116
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
1217
use xorfilter::{Xor8, Xor8Builder};
@@ -120,14 +125,38 @@ pub struct RowGroupFilterBuilder {
120125
}
121126

122127
impl RowGroupFilterBuilder {
123-
pub(crate) fn with_num_columns(num_col: usize) -> Self {
124-
Self {
125-
builders: vec![None; num_col],
126-
}
128+
pub(crate) fn new(record_schema: &RecordSchemaWithKey) -> Self {
129+
let builders = record_schema
130+
.columns()
131+
.iter()
132+
.enumerate()
133+
.map(|(i, col)| {
134+
if record_schema.is_primary_key_index(i) {
135+
return None;
136+
}
137+
138+
if matches!(
139+
col.data_type,
140+
DatumKind::Null
141+
| DatumKind::Double
142+
| DatumKind::Float
143+
| DatumKind::Varbinary
144+
| DatumKind::Boolean
145+
) {
146+
return None;
147+
}
148+
149+
Some(Xor8Builder::default())
150+
})
151+
.collect();
152+
153+
Self { builders }
127154
}
128155

129156
pub(crate) fn add_key(&mut self, col_idx: usize, key: &[u8]) {
130-
self.builders[col_idx].get_or_insert_default().insert(key)
157+
if let Some(b) = self.builders[col_idx].as_mut() {
158+
b.insert(key)
159+
}
131160
}
132161

133162
pub(crate) fn build(self) -> Result<RowGroupFilter> {
@@ -403,6 +432,8 @@ impl TryFrom<sst_pb::ParquetMetaData> for ParquetMetaData {
403432

404433
#[cfg(test)]
405434
mod tests {
435+
use common_types::tests::build_schema;
436+
406437
use super::*;
407438

408439
#[test]
@@ -447,16 +478,22 @@ mod tests {
447478

448479
#[test]
449480
fn test_row_group_filter_builder() {
450-
let mut builders = RowGroupFilterBuilder::with_num_columns(1);
481+
// (key1(varbinary), key2(timestamp), field1(double), field2(string))
482+
let schema = build_schema();
483+
let record_schema = schema.to_record_schema_with_key();
484+
let mut builders = RowGroupFilterBuilder::new(&record_schema);
451485
for key in ["host-123", "host-456", "host-789"] {
452-
builders.add_key(0, key.as_bytes());
486+
builders.add_key(3, key.as_bytes());
453487
}
454488
let row_group_filter = builders.build().unwrap();
489+
for i in 0..3 {
490+
assert!(row_group_filter.column_filters[i].is_none());
491+
}
455492

456493
let testcase = [("host-123", true), ("host-321", false)];
457494
for (key, expected) in testcase {
458495
let actual = row_group_filter
459-
.contains_column_data(0, key.as_bytes())
496+
.contains_column_data(3, key.as_bytes())
460497
.unwrap();
461498

462499
assert_eq!(expected, actual);

analytic_engine/src/sst/parquet/writer.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ impl RecordBatchGroupWriter {
147147
&self,
148148
row_group_batch: &[RecordBatchWithKey],
149149
) -> Result<RowGroupFilter> {
150-
let mut builder = RowGroupFilterBuilder::with_num_columns(row_group_batch[0].num_columns());
150+
let mut builder = RowGroupFilterBuilder::new(row_group_batch[0].schema_with_key());
151151

152152
for partial_batch in row_group_batch {
153153
for (col_idx, column) in partial_batch.columns().iter().enumerate() {

common_types/src/schema.rs

+4
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,10 @@ impl RecordSchemaWithKey {
514514
&self.primary_key_indexes
515515
}
516516

517+
pub fn is_primary_key_index(&self, idx: usize) -> bool {
518+
self.primary_key_indexes.contains(&idx)
519+
}
520+
517521
pub fn index_of(&self, name: &str) -> Option<usize> {
518522
self.record_schema.index_of(name)
519523
}

tools/src/sst_util.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,14 @@ pub async fn meta_from_sst(store: &ObjectStoreRef, sst_path: &Path) -> MetaData
99
let get_result = store.get(sst_path).await.unwrap();
1010
let chunk_reader = get_result.bytes().await.unwrap();
1111
let metadata = footer::parse_metadata(&chunk_reader).unwrap();
12-
let kv_metas = metadata.file_metadata().key_value_metadata().unwrap();
1312

14-
let parquet_meta_data = encoding::decode_sst_meta_data(&kv_metas[0]).unwrap();
13+
let file_meta_data = metadata.file_metadata();
14+
let kv_metas = file_meta_data.key_value_metadata().unwrap();
15+
let kv_meta = kv_metas
16+
.iter()
17+
.find(|kv| kv.key == encoding::META_KEY)
18+
.unwrap();
19+
20+
let parquet_meta_data = encoding::decode_sst_meta_data(kv_meta).unwrap();
1521
MetaData::from(parquet_meta_data)
1622
}

0 commit comments

Comments
 (0)