Skip to content

Commit e4e3c0b

Browse files
ShiKaiWidust1
authored andcommitted
fix: arrow meta data is lost when decode custom meta data (apache#1004)
## Rationale The metadata for arrow schema is encoded into the parquet file. However, this part is lost when building our custom metadata. ## Detailed Changes Keep the other metadata in the parquet metadata after extracting our custom metadata. ## Test Plan Add unit test `test_arrow_meta_data` for it.
1 parent de623b2 commit e4e3c0b

File tree

4 files changed

+179
-15
lines changed

4 files changed

+179
-15
lines changed

analytic_engine/src/sst/meta_data/cache.rs

+173-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
use std::{
44
fmt::Debug,
@@ -7,7 +7,7 @@ use std::{
77

88
use lru::LruCache;
99
use parquet::file::metadata::FileMetaData;
10-
use snafu::{OptionExt, ResultExt};
10+
use snafu::{ensure, OptionExt, ResultExt};
1111

1212
use crate::sst::{
1313
meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result},
@@ -39,14 +39,24 @@ impl MetaData {
3939
let kv_metas = file_meta_data
4040
.key_value_metadata()
4141
.context(KvMetaDataNotFound)?;
42-
let kv_meta = kv_metas
43-
.iter()
44-
.find(|kv| kv.key == encoding::META_KEY)
45-
.context(KvMetaDataNotFound)?;
42+
43+
ensure!(!kv_metas.is_empty(), KvMetaDataNotFound);
44+
let mut other_kv_metas = Vec::with_capacity(kv_metas.len() - 1);
45+
let mut custom_kv_meta = None;
46+
for kv_meta in kv_metas {
47+
// Remove our extended custom meta data from the parquet metadata for small
48+
// memory consumption in the cache.
49+
if kv_meta.key == encoding::META_KEY {
50+
custom_kv_meta = Some(kv_meta);
51+
} else {
52+
other_kv_metas.push(kv_meta.clone());
53+
}
54+
}
4655

4756
let custom = {
57+
let custom_kv_meta = custom_kv_meta.context(KvMetaDataNotFound)?;
4858
let mut sst_meta =
49-
encoding::decode_sst_meta_data(kv_meta).context(DecodeCustomMetaData)?;
59+
encoding::decode_sst_meta_data(custom_kv_meta).context(DecodeCustomMetaData)?;
5060
if ignore_sst_filter {
5161
sst_meta.parquet_filter = None;
5262
}
@@ -56,13 +66,17 @@ impl MetaData {
5666

5767
// let's build a new parquet metadata without the extended key value
5868
// metadata.
69+
let other_kv_metas = if other_kv_metas.is_empty() {
70+
None
71+
} else {
72+
Some(other_kv_metas)
73+
};
5974
let parquet = {
6075
let thin_file_meta_data = FileMetaData::new(
6176
file_meta_data.version(),
6277
file_meta_data.num_rows(),
6378
file_meta_data.created_by().map(|v| v.to_string()),
64-
// Remove the key value metadata.
65-
None,
79+
other_kv_metas,
6680
file_meta_data.schema_descr_ptr(),
6781
file_meta_data.column_orders().cloned(),
6882
);
@@ -111,3 +125,153 @@ impl MetaCache {
111125
self.cache.write().unwrap().put(key, value);
112126
}
113127
}
128+
129+
#[cfg(test)]
130+
mod tests {
131+
use std::{fs::File, path::Path, sync::Arc};
132+
133+
use arrow::{
134+
array::UInt64Builder,
135+
datatypes::{DataType, Field, Schema},
136+
record_batch::RecordBatch,
137+
};
138+
use bytes::Bytes;
139+
use common_types::{
140+
column_schema::Builder as ColumnSchemaBuilder,
141+
schema::Builder as CustomSchemaBuilder,
142+
time::{TimeRange, Timestamp},
143+
};
144+
use parquet::{arrow::ArrowWriter, file::footer};
145+
use parquet_ext::ParquetMetaData;
146+
147+
use super::MetaData;
148+
use crate::sst::parquet::{encoding, meta_data::ParquetMetaData as CustomParquetMetaData};
149+
150+
fn check_parquet_meta_data(original: &ParquetMetaData, processed: &ParquetMetaData) {
151+
assert_eq!(original.page_indexes(), processed.page_indexes());
152+
assert_eq!(original.offset_indexes(), processed.offset_indexes());
153+
assert_eq!(original.num_row_groups(), processed.num_row_groups());
154+
assert_eq!(original.row_groups(), processed.row_groups());
155+
156+
let original_file_md = original.file_metadata();
157+
let processed_file_md = processed.file_metadata();
158+
assert_eq!(original_file_md.num_rows(), processed_file_md.num_rows());
159+
assert_eq!(original_file_md.version(), processed_file_md.version());
160+
assert_eq!(
161+
original_file_md.created_by(),
162+
processed_file_md.created_by()
163+
);
164+
assert_eq!(original_file_md.schema(), processed_file_md.schema());
165+
assert_eq!(
166+
original_file_md.schema_descr(),
167+
processed_file_md.schema_descr()
168+
);
169+
assert_eq!(
170+
original_file_md.schema_descr_ptr(),
171+
processed_file_md.schema_descr_ptr()
172+
);
173+
assert_eq!(
174+
original_file_md.column_orders(),
175+
processed_file_md.column_orders()
176+
);
177+
178+
if let Some(kv_metas) = original_file_md.key_value_metadata() {
179+
let processed_kv_metas = processed_file_md.key_value_metadata().unwrap();
180+
assert_eq!(kv_metas.len(), processed_kv_metas.len() + 1);
181+
let mut idx_for_processed = 0;
182+
for kv in kv_metas {
183+
if kv.key == encoding::META_KEY {
184+
continue;
185+
}
186+
assert_eq!(kv, &processed_kv_metas[idx_for_processed]);
187+
idx_for_processed += 1;
188+
}
189+
} else {
190+
assert!(processed_file_md.key_value_metadata().is_none());
191+
}
192+
}
193+
194+
fn write_parquet_file_with_metadata(
195+
parquet_file_path: &Path,
196+
custom_meta_data: &CustomParquetMetaData,
197+
) {
198+
let tsid_array = {
199+
let mut builder = UInt64Builder::new();
200+
builder.append_value(10);
201+
builder.append_null();
202+
builder.append_value(11);
203+
builder.finish()
204+
};
205+
let timestamp_array = {
206+
let mut builder = UInt64Builder::new();
207+
builder.append_value(1000);
208+
builder.append_null();
209+
builder.append_value(1001);
210+
builder.finish()
211+
};
212+
let file = File::create(parquet_file_path).unwrap();
213+
let schema = Schema::new(vec![
214+
Field::new("tsid", DataType::UInt64, true),
215+
Field::new("timestamp", DataType::UInt64, true),
216+
]);
217+
218+
let batch = RecordBatch::try_new(
219+
Arc::new(schema),
220+
vec![Arc::new(tsid_array), Arc::new(timestamp_array)],
221+
)
222+
.unwrap();
223+
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
224+
225+
let encoded_meta_data = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap();
226+
writer.append_key_value_metadata(encoded_meta_data);
227+
228+
writer.write(&batch).unwrap();
229+
writer.close().unwrap();
230+
}
231+
232+
#[test]
233+
fn test_arrow_meta_data() {
234+
let temp_dir = tempfile::tempdir().unwrap();
235+
let parquet_file_path = temp_dir.path().join("test_arrow_meta_data.par");
236+
let schema = {
237+
let tsid_column_schema = ColumnSchemaBuilder::new(
238+
"tsid".to_string(),
239+
common_types::datum::DatumKind::UInt64,
240+
)
241+
.build()
242+
.unwrap();
243+
let timestamp_column_schema = ColumnSchemaBuilder::new(
244+
"timestamp".to_string(),
245+
common_types::datum::DatumKind::Timestamp,
246+
)
247+
.build()
248+
.unwrap();
249+
CustomSchemaBuilder::new()
250+
.auto_increment_column_id(true)
251+
.add_key_column(tsid_column_schema)
252+
.unwrap()
253+
.add_key_column(timestamp_column_schema)
254+
.unwrap()
255+
.build()
256+
.unwrap()
257+
};
258+
let custom_meta_data = CustomParquetMetaData {
259+
min_key: Bytes::from_static(&[0, 1]),
260+
max_key: Bytes::from_static(&[2, 2]),
261+
time_range: TimeRange::new_unchecked(Timestamp::new(0), Timestamp::new(10)),
262+
max_sequence: 1001,
263+
schema,
264+
parquet_filter: None,
265+
collapsible_cols_idx: vec![],
266+
};
267+
write_parquet_file_with_metadata(parquet_file_path.as_path(), &custom_meta_data);
268+
269+
let parquet_file = File::open(parquet_file_path.as_path()).unwrap();
270+
let parquet_meta_data = footer::parse_metadata(&parquet_file).unwrap();
271+
272+
let meta_data = MetaData::try_new(&parquet_meta_data, false).unwrap();
273+
274+
assert_eq!(**meta_data.custom(), custom_meta_data);
275+
check_parquet_meta_data(&parquet_meta_data, meta_data.parquet());
276+
}
277+
}

analytic_engine/src/sst/parquet/async_reader.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
//! Sst reader implementation based on parquet.
44
@@ -340,7 +340,7 @@ impl<'a> Reader<'a> {
340340
Ok(file_size)
341341
}
342342

343-
async fn load_meta_data_from_storage(&self) -> Result<parquet_ext::ParquetMetaDataRef> {
343+
async fn load_meta_data_from_storage(&self) -> Result<parquet_ext::ParquetMetaData> {
344344
let file_size = self.load_file_size().await?;
345345
let chunk_reader_adapter = ChunkReaderAdapter::new(self.path, self.store);
346346

@@ -351,7 +351,7 @@ impl<'a> Reader<'a> {
351351
file_path: self.path.to_string(),
352352
})?;
353353

354-
Ok(Arc::new(meta_data))
354+
Ok(meta_data)
355355
}
356356

357357
fn need_update_cache(&self) -> bool {

analytic_engine/src/sst/parquet/encoding.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1030,11 +1030,11 @@ mod tests {
10301030
ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns).unwrap();
10311031
let input_record_batch2 =
10321032
ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns2).unwrap();
1033-
let row_nums = encoder
1033+
let num_rows = encoder
10341034
.encode(vec![input_record_batch, input_record_batch2])
10351035
.await
10361036
.unwrap();
1037-
assert_eq!(2, row_nums);
1037+
assert_eq!(2, num_rows);
10381038

10391039
// read encoded records back, and then compare with input records
10401040
encoder.close().await.unwrap();

common_types/src/schema.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ impl Schema {
690690
self.column_schemas.num_columns()
691691
}
692692

693-
/// Returns true if idx is primary key idnex
693+
/// Returns true if idx is primary key index
694694
pub fn is_primary_key_index(&self, idx: &usize) -> bool {
695695
self.primary_key_indexes.contains(idx)
696696
}

0 commit comments

Comments
 (0)