Skip to content

Commit a6b2549

Browse files
ShiKaiWichunshao90
authored andcommitted
fix: arrow meta data is lost when decode custom meta data (apache#1004)
The metadata for arrow schema is encoded into the parquet file. However, this part is lost when building our custom metadata. Keep the other metadata in the parquet metadata after extracting our custom metadata. Add unit test `test_arrow_meta_data` for it.
1 parent 223ada1 commit a6b2549

File tree

4 files changed

+177
-13
lines changed

4 files changed

+177
-13
lines changed

analytic_engine/src/sst/meta_data/cache.rs

+173-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
use std::{
44
fmt::Debug,
@@ -7,7 +7,7 @@ use std::{
77

88
use lru::LruCache;
99
use parquet::file::metadata::FileMetaData;
10-
use snafu::{OptionExt, ResultExt};
10+
use snafu::{ensure, OptionExt, ResultExt};
1111

1212
use crate::sst::{
1313
meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result},
@@ -39,14 +39,24 @@ impl MetaData {
3939
let kv_metas = file_meta_data
4040
.key_value_metadata()
4141
.context(KvMetaDataNotFound)?;
42-
let kv_meta = kv_metas
43-
.iter()
44-
.find(|kv| kv.key == encoding::META_KEY)
45-
.context(KvMetaDataNotFound)?;
42+
43+
ensure!(!kv_metas.is_empty(), KvMetaDataNotFound);
44+
let mut other_kv_metas = Vec::with_capacity(kv_metas.len() - 1);
45+
let mut custom_kv_meta = None;
46+
for kv_meta in kv_metas {
47+
// Remove our extended custom meta data from the parquet metadata for small
48+
// memory consumption in the cache.
49+
if kv_meta.key == encoding::META_KEY {
50+
custom_kv_meta = Some(kv_meta);
51+
} else {
52+
other_kv_metas.push(kv_meta.clone());
53+
}
54+
}
4655

4756
let custom = {
57+
let custom_kv_meta = custom_kv_meta.context(KvMetaDataNotFound)?;
4858
let mut sst_meta =
49-
encoding::decode_sst_meta_data(kv_meta).context(DecodeCustomMetaData)?;
59+
encoding::decode_sst_meta_data(custom_kv_meta).context(DecodeCustomMetaData)?;
5060
if ignore_sst_filter {
5161
sst_meta.parquet_filter = None;
5262
}
@@ -56,13 +66,17 @@ impl MetaData {
5666

5767
// let's build a new parquet metadata without the extended key value
5868
// metadata.
69+
let other_kv_metas = if other_kv_metas.is_empty() {
70+
None
71+
} else {
72+
Some(other_kv_metas)
73+
};
5974
let parquet = {
6075
let thin_file_meta_data = FileMetaData::new(
6176
file_meta_data.version(),
6277
file_meta_data.num_rows(),
6378
file_meta_data.created_by().map(|v| v.to_string()),
64-
// Remove the key value metadata.
65-
None,
79+
other_kv_metas,
6680
file_meta_data.schema_descr_ptr(),
6781
file_meta_data.column_orders().cloned(),
6882
);
@@ -111,3 +125,153 @@ impl MetaCache {
111125
self.cache.write().unwrap().put(key, value);
112126
}
113127
}
128+
129+
#[cfg(test)]
130+
mod tests {
131+
use std::{fs::File, path::Path, sync::Arc};
132+
133+
use arrow::{
134+
array::UInt64Builder,
135+
datatypes::{DataType, Field, Schema},
136+
record_batch::RecordBatch,
137+
};
138+
use bytes::Bytes;
139+
use common_types::{
140+
column_schema::Builder as ColumnSchemaBuilder,
141+
schema::Builder as CustomSchemaBuilder,
142+
time::{TimeRange, Timestamp},
143+
};
144+
use parquet::{arrow::ArrowWriter, file::footer};
145+
use parquet_ext::ParquetMetaData;
146+
147+
use super::MetaData;
148+
use crate::sst::parquet::{encoding, meta_data::ParquetMetaData as CustomParquetMetaData};
149+
150+
fn check_parquet_meta_data(original: &ParquetMetaData, processed: &ParquetMetaData) {
151+
assert_eq!(original.page_indexes(), processed.page_indexes());
152+
assert_eq!(original.offset_indexes(), processed.offset_indexes());
153+
assert_eq!(original.num_row_groups(), processed.num_row_groups());
154+
assert_eq!(original.row_groups(), processed.row_groups());
155+
156+
let original_file_md = original.file_metadata();
157+
let processed_file_md = processed.file_metadata();
158+
assert_eq!(original_file_md.num_rows(), processed_file_md.num_rows());
159+
assert_eq!(original_file_md.version(), processed_file_md.version());
160+
assert_eq!(
161+
original_file_md.created_by(),
162+
processed_file_md.created_by()
163+
);
164+
assert_eq!(original_file_md.schema(), processed_file_md.schema());
165+
assert_eq!(
166+
original_file_md.schema_descr(),
167+
processed_file_md.schema_descr()
168+
);
169+
assert_eq!(
170+
original_file_md.schema_descr_ptr(),
171+
processed_file_md.schema_descr_ptr()
172+
);
173+
assert_eq!(
174+
original_file_md.column_orders(),
175+
processed_file_md.column_orders()
176+
);
177+
178+
if let Some(kv_metas) = original_file_md.key_value_metadata() {
179+
let processed_kv_metas = processed_file_md.key_value_metadata().unwrap();
180+
assert_eq!(kv_metas.len(), processed_kv_metas.len() + 1);
181+
let mut idx_for_processed = 0;
182+
for kv in kv_metas {
183+
if kv.key == encoding::META_KEY {
184+
continue;
185+
}
186+
assert_eq!(kv, &processed_kv_metas[idx_for_processed]);
187+
idx_for_processed += 1;
188+
}
189+
} else {
190+
assert!(processed_file_md.key_value_metadata().is_none());
191+
}
192+
}
193+
194+
fn write_parquet_file_with_metadata(
195+
parquet_file_path: &Path,
196+
custom_meta_data: &CustomParquetMetaData,
197+
) {
198+
let tsid_array = {
199+
let mut builder = UInt64Builder::new();
200+
builder.append_value(10);
201+
builder.append_null();
202+
builder.append_value(11);
203+
builder.finish()
204+
};
205+
let timestamp_array = {
206+
let mut builder = UInt64Builder::new();
207+
builder.append_value(1000);
208+
builder.append_null();
209+
builder.append_value(1001);
210+
builder.finish()
211+
};
212+
let file = File::create(parquet_file_path).unwrap();
213+
let schema = Schema::new(vec![
214+
Field::new("tsid", DataType::UInt64, true),
215+
Field::new("timestamp", DataType::UInt64, true),
216+
]);
217+
218+
let batch = RecordBatch::try_new(
219+
Arc::new(schema),
220+
vec![Arc::new(tsid_array), Arc::new(timestamp_array)],
221+
)
222+
.unwrap();
223+
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
224+
225+
let encoded_meta_data = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap();
226+
writer.append_key_value_metadata(encoded_meta_data);
227+
228+
writer.write(&batch).unwrap();
229+
writer.close().unwrap();
230+
}
231+
232+
#[test]
233+
fn test_arrow_meta_data() {
234+
let temp_dir = tempfile::tempdir().unwrap();
235+
let parquet_file_path = temp_dir.path().join("test_arrow_meta_data.par");
236+
let schema = {
237+
let tsid_column_schema = ColumnSchemaBuilder::new(
238+
"tsid".to_string(),
239+
common_types::datum::DatumKind::UInt64,
240+
)
241+
.build()
242+
.unwrap();
243+
let timestamp_column_schema = ColumnSchemaBuilder::new(
244+
"timestamp".to_string(),
245+
common_types::datum::DatumKind::Timestamp,
246+
)
247+
.build()
248+
.unwrap();
249+
CustomSchemaBuilder::new()
250+
.auto_increment_column_id(true)
251+
.add_key_column(tsid_column_schema)
252+
.unwrap()
253+
.add_key_column(timestamp_column_schema)
254+
.unwrap()
255+
.build()
256+
.unwrap()
257+
};
258+
let custom_meta_data = CustomParquetMetaData {
259+
min_key: Bytes::from_static(&[0, 1]),
260+
max_key: Bytes::from_static(&[2, 2]),
261+
time_range: TimeRange::new_unchecked(Timestamp::new(0), Timestamp::new(10)),
262+
max_sequence: 1001,
263+
schema,
264+
parquet_filter: None,
265+
collapsible_cols_idx: vec![],
266+
};
267+
write_parquet_file_with_metadata(parquet_file_path.as_path(), &custom_meta_data);
268+
269+
let parquet_file = File::open(parquet_file_path.as_path()).unwrap();
270+
let parquet_meta_data = footer::parse_metadata(&parquet_file).unwrap();
271+
272+
let meta_data = MetaData::try_new(&parquet_meta_data, false).unwrap();
273+
274+
assert_eq!(**meta_data.custom(), custom_meta_data);
275+
check_parquet_meta_data(&parquet_meta_data, meta_data.parquet());
276+
}
277+
}

analytic_engine/src/sst/parquet/async_reader.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
//! Sst reader implementation based on parquet.
44

analytic_engine/src/sst/parquet/encoding.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1030,11 +1030,11 @@ mod tests {
10301030
ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns).unwrap();
10311031
let input_record_batch2 =
10321032
ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns2).unwrap();
1033-
let row_nums = encoder
1033+
let num_rows = encoder
10341034
.encode(vec![input_record_batch, input_record_batch2])
10351035
.await
10361036
.unwrap();
1037-
assert_eq!(2, row_nums);
1037+
assert_eq!(2, num_rows);
10381038

10391039
// read encoded records back, and then compare with input records
10401040
encoder.close().await.unwrap();

common_types/src/schema.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ impl Schema {
690690
self.column_schemas.num_columns()
691691
}
692692

693-
/// Returns true if idx is primary key idnex
693+
/// Returns true if idx is primary key index
694694
pub fn is_primary_key_index(&self, idx: &usize) -> bool {
695695
self.primary_key_indexes.contains(idx)
696696
}

0 commit comments

Comments
 (0)