Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: separate metadata from parquet's kv_metadata #1120

Merged
merged 49 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
84b2141
add critical path comment
tanruixiang Jul 27, 2023
bec3378
draft
tanruixiang Jul 31, 2023
f814484
draft: write into another file success
tanruixiang Jul 31, 2023
a4bb37d
decode encode custom success
tanruixiang Aug 1, 2023
c0e8a86
draft
tanruixiang Aug 1, 2023
0603689
Merge remote-tracking branch 'origin/main' into separate_metadata
tanruixiang Aug 1, 2023
9e58acc
fix compile error
tanruixiang Aug 1, 2023
cf7b45c
fix ut
tanruixiang Aug 1, 2023
7bb1fde
update
tanruixiang Aug 2, 2023
dfa6af3
read sst meta path from parquet's metadata
tanruixiang Aug 2, 2023
0e048ed
fmt
tanruixiang Aug 2, 2023
29a4d7a
Merge branch 'main' into separate_metadata
tanruixiang Aug 7, 2023
422a4b6
beauty code
tanruixiang Aug 7, 2023
8f88b68
add error handle
tanruixiang Aug 8, 2023
3f48468
add error handle
tanruixiang Aug 8, 2023
665c774
clippy
tanruixiang Aug 8, 2023
ab86aff
Merge branch 'main' into separate_metadata
tanruixiang Aug 8, 2023
1b4e500
Merge branch 'main' into separate_metadata
tanruixiang Aug 9, 2023
8734400
Merge branch 'main' into separate_metadata
tanruixiang Aug 9, 2023
c9b5f57
encode decode pb binary
tanruixiang Aug 9, 2023
43ed794
Merge branch 'main' into separate_metadata
tanruixiang Aug 9, 2023
f6d4ec3
update
tanruixiang Aug 9, 2023
350f4b3
store path version in parquet kv
tanruixiang Aug 9, 2023
146c950
update comment
tanruixiang Aug 9, 2023
e0c422b
fix bug
tanruixiang Aug 9, 2023
e07391e
fix
tanruixiang Aug 9, 2023
40440fc
write into upper layer
tanruixiang Aug 9, 2023
45de991
format
tanruixiang Aug 9, 2023
449d7af
fix
tanruixiang Aug 9, 2023
8857adb
beauty code
tanruixiang Aug 10, 2023
9821195
format
tanruixiang Aug 10, 2023
a5766a4
delete unuse import
tanruixiang Aug 10, 2023
d8b6049
add empty line
tanruixiang Aug 10, 2023
c623d03
add empty line
tanruixiang Aug 10, 2023
787e94c
update by review
tanruixiang Aug 10, 2023
923bc62
update
tanruixiang Aug 10, 2023
e1dff07
fmt
tanruixiang Aug 10, 2023
38b05a2
update
tanruixiang Aug 10, 2023
8b3cbc2
draft: delete files
tanruixiang Aug 10, 2023
d039ad0
delete files
tanruixiang Aug 14, 2023
fb17c6b
make fix
tanruixiang Aug 14, 2023
f7fde77
fmt
tanruixiang Aug 14, 2023
5c3fc48
Merge branch 'main' into separate_metadata
jiacai2050 Aug 22, 2023
91b005c
refactor metadata reader
jiacai2050 Aug 22, 2023
ef5642b
bump proto, add associated_files
jiacai2050 Aug 23, 2023
4111c6e
remove meta_path from sst reader
jiacai2050 Aug 23, 2023
da1a6b6
remove clone
jiacai2050 Aug 23, 2023
32a0f2f
retry when delete failed
jiacai2050 Aug 24, 2023
8fe3ef6
rename meta file
jiacai2050 Aug 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 70 additions & 10 deletions analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ use std::{
};

use lru::LruCache;
use parquet::file::metadata::FileMetaData;
use object_store::{ObjectStoreRef, Path};
use parquet::{data_type::AsBytes, file::metadata::FileMetaData, format::KeyValue};
use parquet_ext::meta_data::ChunkReader;
use snafu::{ensure, OptionExt, ResultExt};

use crate::sst::{
meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result},
parquet::encoding,
meta_data::{
DecodeCustomMetaData, FetchAndDecodeSstMeta, KvMetaDataNotFound, KvMetaPathEmpty,
KvMetaVersionEmpty, MetaPathVersionWrong, ObjectStoreError, ParquetMetaDataRef, Result,
},
parquet::{
async_reader::ChunkReaderAdapter,
encoding::{self, decode_sst_custom_meta_data, META_PATH_VERSION_V1, META_PATH_VERSION_V2},
},
};

pub type MetaCacheRef = Arc<MetaCache>;
Expand All @@ -45,37 +53,83 @@ impl MetaData {
/// contains no extended custom information.
// TODO: remove it and use the suggested api.
#[allow(deprecated)]
pub fn try_new(
pub async fn try_new(
parquet_meta_data: &parquet_ext::ParquetMetaData,
ignore_sst_filter: bool,
meta_store: ObjectStoreRef,
) -> Result<Self> {
let file_meta_data = parquet_meta_data.file_metadata();
let kv_metas = file_meta_data
.key_value_metadata()
.context(KvMetaDataNotFound)?;

ensure!(!kv_metas.is_empty(), KvMetaDataNotFound);
let mut other_kv_metas = Vec::with_capacity(kv_metas.len() - 1);

let mut meta_path = None;

let mut other_kv_metas: Vec<KeyValue> = Vec::with_capacity(kv_metas.len() - 1);
let mut custom_kv_meta = None;
let mut meta_path_version = META_PATH_VERSION_V1.to_string();
for kv_meta in kv_metas {
// Remove our extended custom meta data from the parquet metadata for small
// memory consumption in the cache.
if kv_meta.key == encoding::META_KEY {
custom_kv_meta = Some(kv_meta);
} else if kv_meta.key == encoding::META_PATH_KEY {
meta_path = kv_meta.value.as_ref().map(|path| Path::from(path.as_str()))
} else if kv_meta.key == encoding::META_PATH_VERSION_KEY {
meta_path_version = kv_meta.value.as_ref().context(KvMetaVersionEmpty)?.clone();
} else {
other_kv_metas.push(kv_meta.clone());
}
}

let custom = {
// Must ensure custom metadata only store in one place (V2)
ensure!(
custom_kv_meta.is_none() || meta_path.is_none(),
KvMetaDataNotFound
);

let custom = if meta_path_version == META_PATH_VERSION_V1 {
let custom_kv_meta = custom_kv_meta.context(KvMetaDataNotFound)?;
let mut sst_meta =
encoding::decode_sst_meta_data(custom_kv_meta).context(DecodeCustomMetaData)?;
if ignore_sst_filter {
sst_meta.parquet_filter = None;
}

Arc::new(sst_meta)
} else if meta_path_version == META_PATH_VERSION_V2 {
let decode_custom_metadata = match meta_path {
Some(meta_path) => {
let meta_size = meta_store
.head(&meta_path)
.await
.context(ObjectStoreError)?
.size;

let meta_chunk_reader_adapter =
ChunkReaderAdapter::new(&meta_path, &meta_store);

let metadata = meta_chunk_reader_adapter
.get_bytes(0..meta_size)
.await
.with_context(|| FetchAndDecodeSstMeta {
file_path: meta_path.to_string(),
})?;

Some(
decode_sst_custom_meta_data(metadata.as_bytes())
.context(DecodeCustomMetaData)?,
)
}
None => return KvMetaPathEmpty {}.fail(),
};
Arc::new(decode_custom_metadata.unwrap())
} else {
return MetaPathVersionWrong {
path_version: meta_path_version,
}
.fail();
};

// let's build a new parquet metadata without the extended key value
Expand Down Expand Up @@ -155,6 +209,7 @@ mod tests {
schema::Builder as CustomSchemaBuilder,
time::{TimeRange, Timestamp},
};
use object_store::LocalFileSystem;
use parquet::{arrow::ArrowWriter, file::footer};
use parquet_ext::ParquetMetaData;

Expand Down Expand Up @@ -245,8 +300,8 @@ mod tests {
writer.close().unwrap();
}

#[test]
fn test_arrow_meta_data() {
#[tokio::test]
async fn test_arrow_meta_data() {
let temp_dir = tempfile::tempdir().unwrap();
let parquet_file_path = temp_dir.path().join("test_arrow_meta_data.par");
let schema = {
Expand Down Expand Up @@ -285,7 +340,12 @@ mod tests {
let parquet_file = File::open(parquet_file_path.as_path()).unwrap();
let parquet_meta_data = footer::parse_metadata(&parquet_file).unwrap();

let meta_data = MetaData::try_new(&parquet_meta_data, false).unwrap();
let store =
Arc::new(LocalFileSystem::new_with_prefix(parquet_file_path.as_path()).unwrap());

let meta_data = MetaData::try_new(&parquet_meta_data, false, store)
.await
.unwrap();

assert_eq!(**meta_data.custom(), custom_meta_data);
check_parquet_meta_data(&parquet_meta_data, meta_data.parquet());
Expand Down
42 changes: 41 additions & 1 deletion analytic_engine/src/sst/meta_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

pub mod cache;

use std::sync::Arc;
use std::{str::Utf8Error, sync::Arc};

use ceresdbproto::sst as sst_pb;
use common_types::{schema::Schema, time::TimeRange, SequenceNumber};
use generic_error::GenericError;
use macros::define_result;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::table::TableId;
Expand Down Expand Up @@ -47,6 +48,25 @@ pub enum Error {
))]
KvMetaDataNotFound { backtrace: Backtrace },

#[snafu(display(
"Key value meta path version in parquet is empty\nBacktrace\n:{}",
backtrace
))]
KvMetaVersionEmpty { backtrace: Backtrace },

#[snafu(display("Key value meta path in parquet is empty\nBacktrace\n:{}", backtrace))]
KvMetaPathEmpty { backtrace: Backtrace },

#[snafu(display(
"Mata path version {} is wrong\nBacktrace\n:{}",
path_version,
backtrace
))]
MetaPathVersionWrong {
path_version: String,
backtrace: Backtrace,
},

#[snafu(display("Metadata in proto struct is not found.\nBacktrace\n:{}", backtrace))]
MetaDataNotFound { backtrace: Backtrace },

Expand All @@ -64,6 +84,26 @@ pub enum Error {

#[snafu(display("Failed to convert parquet meta data, err:{}", source))]
ConvertParquetMetaData { source: parquet::meta_data::Error },

#[snafu(display("Meet a object store error, err:{source}\nBacktrace:\n{backtrace}"))]
ObjectStoreError {
source: object_store::ObjectStoreError,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to decode sst meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}",
))]
FetchAndDecodeSstMeta {
file_path: String,
source: GenericError,
backtrace: Backtrace,
},

#[snafu(display("Meet a object store error, err:{source}\nBacktrace:\n{backtrace}"))]
Utf8ErrorWrapper {
source: Utf8Error,
backtrace: Backtrace,
},
}

define_result!(Error);
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ use crate::{
reader::{error::*, Result, SstReader},
},
};

const PRUNE_ROW_GROUPS_METRICS_COLLECTOR_NAME: &str = "prune_row_groups";
type SendableRecordBatchStream = Pin<Box<dyn Stream<Item = Result<ArrowRecordBatch>> + Send>>;
type RecordBatchWithKeyStream = Box<dyn Stream<Item = Result<RecordBatchWithKey>> + Send + Unpin>;
Expand Down Expand Up @@ -377,7 +376,8 @@ impl<'a> Reader<'a> {

// TODO: Support page index until https://github.com/CeresDB/ceresdb/issues/1040 is fixed.

MetaData::try_new(&parquet_meta_data, ignore_sst_filter)
MetaData::try_new(&parquet_meta_data, ignore_sst_filter, self.store.clone())
.await
.box_err()
.context(DecodeSstMeta)
}
Expand Down
Loading