diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index ad662bdffb..0616b91477 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -724,7 +724,6 @@ mod tests { max_sequence: 200, schema: build_schema(), parquet_filter: Default::default(), - collapsible_cols_idx: Vec::new(), }; SstMetaData::Parquet(Arc::new(parquet_meta_data)) diff --git a/analytic_engine/src/sst/factory.rs b/analytic_engine/src/sst/factory.rs index 2cf7d21374..32454de9db 100644 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@ -171,7 +171,7 @@ impl Factory for FactoryImpl { }; match storage_format { - StorageFormat::Columnar | StorageFormat::Hybrid => { + StorageFormat::Columnar => { let reader = AsyncParquetReader::new( path, options, @@ -197,16 +197,9 @@ impl Factory for FactoryImpl { store_picker: &'a ObjectStorePickerRef, level: Level, ) -> Result> { - let hybrid_encoding = match options.storage_format_hint { - StorageFormatHint::Specific(format) => matches!(format, StorageFormat::Hybrid), - // `Auto` is mapped to columnar parquet format now, may change in future. - StorageFormatHint::Auto => false, - }; - Ok(Box::new(ParquetSstWriter::new( path, level, - hybrid_encoding, store_picker, options, ))) diff --git a/analytic_engine/src/sst/meta_data/cache.rs b/analytic_engine/src/sst/meta_data/cache.rs index 0f2cc9d37c..f0101d708e 100644 --- a/analytic_engine/src/sst/meta_data/cache.rs +++ b/analytic_engine/src/sst/meta_data/cache.rs @@ -163,12 +163,18 @@ mod tests { schema::Builder as CustomSchemaBuilder, time::{TimeRange, Timestamp}, }; - use object_store::LocalFileSystem; + use object_store::{LocalFileSystem, ObjectStoreRef}; use parquet::{arrow::ArrowWriter, file::footer}; use parquet_ext::ParquetMetaData; use super::MetaData; - use crate::sst::parquet::{encoding, meta_data::ParquetMetaData as CustomParquetMetaData}; + use crate::{ + sst::parquet::{ + encoding::{self, META_PATH_KEY, META_VERSION_KEY}, + meta_data::ParquetMetaData as CustomParquetMetaData, + }, + table::sst_util::new_metadata_path, + }; // TODO: remove it and use the suggested api. #[allow(deprecated)] @@ -202,21 +208,28 @@ mod tests { if let Some(kv_metas) = original_file_md.key_value_metadata() { let processed_kv_metas = processed_file_md.key_value_metadata().unwrap(); - assert_eq!(kv_metas.len(), processed_kv_metas.len() + 1); - let mut idx_for_processed = 0; + assert_eq!(kv_metas.len(), processed_kv_metas.len() + 2); for kv in kv_metas { - if kv.key == encoding::META_KEY { - continue; + match kv.key.as_str() { + "ARROW:schema" => { + // don't care this + } + encoding::META_KEY => assert!(kv.value.is_none()), + encoding::META_VERSION_KEY => assert_eq!("2", kv.value.clone().unwrap()), + encoding::META_PATH_KEY => { + let meta_path = kv.value.as_ref().unwrap(); + assert!(meta_path.ends_with(".metadata")); + } + _ => panic!("Unknown parquet kv, value:{kv:?}"), } - assert_eq!(kv, &processed_kv_metas[idx_for_processed]); - idx_for_processed += 1; } } else { assert!(processed_file_md.key_value_metadata().is_none()); } } - fn write_parquet_file_with_metadata( + async fn write_parquet_file_with_metadata( + store: ObjectStoreRef, parquet_file_path: &Path, custom_meta_data: &CustomParquetMetaData, ) { @@ -246,13 +259,21 @@ mod tests { ) .unwrap(); let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap(); - - let encoded_meta_data = - encoding::encode_sst_meta_data_v1(custom_meta_data.clone()).unwrap(); - writer.append_key_value_metadata(encoded_meta_data); - + let meta_path = new_metadata_path(parquet_file_path.to_str().unwrap()); + writer.append_key_value_metadata(parquet::format::KeyValue { + key: META_PATH_KEY.to_string(), + value: Some(meta_path.clone()), + }); + writer.append_key_value_metadata(parquet::format::KeyValue { + key: META_VERSION_KEY.to_string(), + value: Some("2".to_string()), + }); writer.write(&batch).unwrap(); writer.close().unwrap(); + + let bytes = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap(); + let meta_path = object_store::Path::from(meta_path); + store.put(&meta_path, bytes).await.unwrap(); } #[tokio::test] @@ -288,14 +309,17 @@ mod tests { max_sequence: 1001, schema, parquet_filter: None, - collapsible_cols_idx: vec![], }; - write_parquet_file_with_metadata(parquet_file_path.as_path(), &custom_meta_data); + let store = Arc::new(LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap()); + write_parquet_file_with_metadata( + store.clone(), + parquet_file_path.as_path(), + &custom_meta_data, + ) + .await; let parquet_file = File::open(parquet_file_path.as_path()).unwrap(); let parquet_meta_data = footer::parse_metadata(&parquet_file).unwrap(); - let store = - Arc::new(LocalFileSystem::new_with_prefix(parquet_file_path.as_path()).unwrap()); let meta_data = MetaData::try_new(&parquet_meta_data, false, store) .await .unwrap(); diff --git a/analytic_engine/src/sst/meta_data/metadata_reader.rs b/analytic_engine/src/sst/meta_data/metadata_reader.rs index 017fae6c7d..08bf421d1f 100644 --- a/analytic_engine/src/sst/meta_data/metadata_reader.rs +++ b/analytic_engine/src/sst/meta_data/metadata_reader.rs @@ -27,7 +27,7 @@ use crate::sst::{ KvMetaPathEmpty, }, parquet::{ - encoding::{self, decode_sst_meta_data_v2, META_VERSION_CURRENT, META_VERSION_V1}, + encoding::{self, decode_sst_meta_data_from_bytes, META_VERSION_CURRENT, META_VERSION_V1}, meta_data::{ParquetMetaData, ParquetMetaDataRef}, }, }; @@ -54,7 +54,7 @@ impl CustomMetadataReader for MetaV1Reader<'_> { async fn get_metadata(&self) -> Result { let custom_kv_meta = self.custom_kv_meta.context(KvMetaDataNotFound)?; - encoding::decode_sst_meta_data_v1(custom_kv_meta).context(DecodeCustomMetaData) + encoding::decode_sst_meta_data_from_kv(custom_kv_meta).context(DecodeCustomMetaData) } } @@ -88,7 +88,7 @@ impl CustomMetadataReader for MetaV2Reader { file_path: meta_path.to_string(), })?; - decode_sst_meta_data_v2(metadata.as_bytes()).context(DecodeCustomMetaData) + decode_sst_meta_data_from_bytes(metadata.as_bytes()).context(DecodeCustomMetaData) } } } diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 00549ff2fa..b9f00de55b 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -63,9 +63,7 @@ use crate::{ SstMetaData, }, parquet::{ - encoding::ParquetDecoder, - meta_data::{ParquetFilter, ParquetMetaDataRef}, - row_group_pruner::RowGroupPruner, + encoding::ParquetDecoder, meta_data::ParquetFilter, row_group_pruner::RowGroupPruner, }, reader::{error::*, Result, SstReader}, }, @@ -157,20 +155,12 @@ impl<'a> Reader<'a> { ArrowRecordBatchProjector::from(row_projector) }; - let sst_meta_data = self - .meta_data - .as_ref() - // metadata must be inited after `init_if_necessary`. - .unwrap() - .custom(); - let streams: Vec<_> = streams .into_iter() .map(|stream| { Box::new(RecordBatchProjector::new( stream, row_projector.clone(), - sst_meta_data.clone(), self.metrics.metrics_collector.clone(), )) as _ }) @@ -474,14 +464,12 @@ struct RecordBatchProjector { metrics: ProjectorMetrics, start_time: Instant, - sst_meta: ParquetMetaDataRef, } impl RecordBatchProjector { fn new( stream: SendableRecordBatchStream, row_projector: ArrowRecordBatchProjector, - sst_meta: ParquetMetaDataRef, metrics_collector: Option, ) -> Self { let metrics = ProjectorMetrics { @@ -494,7 +482,6 @@ impl RecordBatchProjector { row_projector, metrics, start_time: Instant::now(), - sst_meta, } } } @@ -510,8 +497,7 @@ impl Stream for RecordBatchProjector { match record_batch.box_err().context(DecodeRecordBatch {}) { Err(e) => Poll::Ready(Some(Err(e))), Ok(record_batch) => { - let parquet_decoder = - ParquetDecoder::new(&projector.sst_meta.collapsible_cols_idx); + let parquet_decoder = ParquetDecoder::new(); let record_batch = parquet_decoder .decode_record_batch(record_batch) .box_err() diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index e6c55a7c09..7631da7455 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -12,25 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{convert::TryFrom, mem, sync::Arc}; +use std::convert::TryFrom; -use arrow::{ - array::{make_array, Array, ArrayData, ArrayRef}, - buffer::MutableBuffer, - compute, - record_batch::RecordBatch as ArrowRecordBatch, - util::bit_util, -}; +use arrow::{compute, record_batch::RecordBatch as ArrowRecordBatch}; use async_trait::async_trait; use bytes::Bytes; use bytes_ext::{BytesMut, SafeBufMut}; use ceresdbproto::sst as sst_pb; -use common_types::{ - datum::DatumKind, - schema::{ArrowSchema, ArrowSchemaRef, DataType, Field, Schema}, -}; +use common_types::schema::{ArrowSchemaRef, Schema}; use generic_error::{BoxError, GenericError}; -use log::trace; use macros::define_result; use parquet::{ arrow::AsyncArrowWriter, @@ -41,13 +31,7 @@ use prost::{bytes, Message}; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use tokio::io::AsyncWrite; -use crate::sst::parquet::{ - hybrid::{self, IndexedType}, - meta_data::ParquetMetaData, -}; - -// TODO: Only support i32 offset now, consider i64 here? -const OFFSET_SIZE: usize = std::mem::size_of::(); +use crate::sst::parquet::meta_data::ParquetMetaData; #[derive(Debug, Snafu)] pub enum Error { @@ -164,35 +148,6 @@ pub enum Error { source: GenericError, backtrace: Backtrace, }, - - #[snafu(display( - "Failed to decode hybrid record batch, err:{}.\nBacktrace:\n{}", - source, - backtrace - ))] - DecodeRecordBatch { - source: GenericError, - backtrace: Backtrace, - }, - - #[snafu(display( - "Sst meta data collapsible_cols_idx is empty, fail to decode hybrid record batch.\nBacktrace:\n{}", - backtrace - ))] - CollapsibleColsIdxEmpty { backtrace: Backtrace }, - - #[snafu(display("Tsid is required for hybrid format.\nBacktrace:\n{}", backtrace))] - TsidRequired { backtrace: Backtrace }, - - #[snafu(display( - "Key column must be string type. type:{}\nBacktrace:\n{}", - type_name, - backtrace - ))] - StringKeyColumnRequired { - type_name: String, - backtrace: Backtrace, - }, } define_result!(Error); @@ -210,7 +165,7 @@ pub const META_VERSION_KEY: &str = "meta_version"; pub const META_VALUE_HEADER: u8 = 0; /// Encode the sst custom meta data into binary key value pair. -pub fn encode_sst_meta_data_v2(meta_data: ParquetMetaData) -> Result { +pub fn encode_sst_meta_data(meta_data: ParquetMetaData) -> Result { let meta_data_pb = sst_pb::ParquetMetaData::from(meta_data); let mut buf = BytesMut::with_capacity(meta_data_pb.encoded_len() + 1); @@ -223,7 +178,7 @@ pub fn encode_sst_meta_data_v2(meta_data: ParquetMetaData) -> Result { } /// Decode the sst custom meta data from the binary key value pair. -pub fn decode_sst_meta_data_v2(bytes: &[u8]) -> Result { +pub fn decode_sst_meta_data_from_bytes(bytes: &[u8]) -> Result { ensure!( bytes[0] == META_VALUE_HEADER, InvalidMetaBytesHeader { @@ -238,19 +193,9 @@ pub fn decode_sst_meta_data_v2(bytes: &[u8]) -> Result { ParquetMetaData::try_from(meta_data_pb).context(ConvertSstMetaData) } -/// Encode the sst meta data into binary key value pair. -// TODO: remove this function when hybrid format is not supported. -pub fn encode_sst_meta_data_v1(meta_data: ParquetMetaData) -> Result { - let buf = encode_sst_meta_data_v2(meta_data)?; - - Ok(KeyValue { - key: META_KEY.to_string(), - value: Some(base64::encode(buf.as_ref())), - }) -} - /// Decode the sst meta data from the binary key value pair. -pub fn decode_sst_meta_data_v1(kv: &KeyValue) -> Result { +/// Used in v1 format. +pub fn decode_sst_meta_data_from_kv(kv: &KeyValue) -> Result { ensure!( kv.key == META_KEY, InvalidMetaKey { @@ -267,7 +212,7 @@ pub fn decode_sst_meta_data_v1(kv: &KeyValue) -> Result { let raw_bytes = base64::decode(meta_value).context(DecodeBase64MetaValue { meta_value })?; - decode_sst_meta_data_v2(&raw_bytes) + decode_sst_meta_data_from_bytes(&raw_bytes) } /// RecordEncoder is used for encoding ArrowBatch. @@ -278,8 +223,6 @@ trait RecordEncoder { /// Encode vector of arrow batch, return encoded row number async fn encode(&mut self, record_batches: Vec) -> Result; - fn set_meta_data(&mut self, meta_data: ParquetMetaData) -> Result<()>; - fn set_meta_data_path(&mut self, metadata_path: Option) -> Result<()>; /// Return encoded bytes @@ -345,12 +288,6 @@ impl RecordEncoder for ColumnarRecordEncoder { Ok(record_batch.num_rows()) } - // TODO: this function is not need any more in meta v2 format, - // remove it in future. - fn set_meta_data(&mut self, _meta_data: ParquetMetaData) -> Result<()> { - Ok(()) - } - fn set_meta_data_path(&mut self, metadata_path: Option) -> Result<()> { let path_kv = KeyValue { key: META_PATH_KEY.to_string(), @@ -381,143 +318,6 @@ impl RecordEncoder for ColumnarRecordEncoder { } } -struct HybridRecordEncoder { - // wrap in Option so ownership can be taken out behind `&mut self` - arrow_writer: Option>, - arrow_schema: ArrowSchemaRef, - tsid_type: IndexedType, - non_collapsible_col_types: Vec, - // columns that can be collapsed into list - collapsible_col_types: Vec, - collapsible_col_idx: Vec, -} - -impl HybridRecordEncoder { - fn try_new( - sink: W, - schema: &Schema, - num_rows_per_row_group: usize, - max_buffer_size: usize, - compression: Compression, - ) -> Result { - // TODO: What we really want here is a unique ID, tsid is one case - // Maybe support other cases later. - let tsid_idx = schema.index_of_tsid().context(TsidRequired)?; - let tsid_type = IndexedType { - idx: tsid_idx, - data_type: schema.column(tsid_idx).data_type, - }; - - let mut non_collapsible_col_types = Vec::new(); - let mut collapsible_col_types = Vec::new(); - let mut collapsible_col_idx = Vec::new(); - for (idx, col) in schema.columns().iter().enumerate() { - if idx == tsid_idx { - continue; - } - - if schema.is_collapsible_column(idx) { - collapsible_col_types.push(IndexedType { - idx, - data_type: schema.column(idx).data_type, - }); - collapsible_col_idx.push(idx as u32); - } else { - // TODO: support non-string key columns - ensure!( - matches!(col.data_type, DatumKind::String), - StringKeyColumnRequired { - type_name: col.data_type.to_string(), - } - ); - non_collapsible_col_types.push(IndexedType { - idx, - data_type: col.data_type, - }); - } - } - - let arrow_schema = hybrid::build_hybrid_arrow_schema(schema); - - let write_props = WriterProperties::builder() - .set_max_row_group_size(num_rows_per_row_group) - .set_compression(compression) - .build(); - - let arrow_writer = AsyncArrowWriter::try_new( - sink, - arrow_schema.clone(), - max_buffer_size, - Some(write_props), - ) - .box_err() - .context(EncodeRecordBatch)?; - Ok(Self { - arrow_writer: Some(arrow_writer), - arrow_schema, - tsid_type, - non_collapsible_col_types, - collapsible_col_types, - collapsible_col_idx, - }) - } -} - -#[async_trait] -impl RecordEncoder for HybridRecordEncoder { - async fn encode(&mut self, arrow_record_batch_vec: Vec) -> Result { - assert!(self.arrow_writer.is_some()); - - let record_batch = hybrid::convert_to_hybrid_record( - &self.tsid_type, - &self.non_collapsible_col_types, - &self.collapsible_col_types, - self.arrow_schema.clone(), - arrow_record_batch_vec, - ) - .box_err() - .context(EncodeRecordBatch)?; - - self.arrow_writer - .as_mut() - .unwrap() - .write(&record_batch) - .await - .box_err() - .context(EncodeRecordBatch)?; - - Ok(record_batch.num_rows()) - } - - fn set_meta_data(&mut self, mut meta_data: ParquetMetaData) -> Result<()> { - meta_data.collapsible_cols_idx = mem::take(&mut self.collapsible_col_idx); - let key_value = encode_sst_meta_data_v1(meta_data)?; - self.arrow_writer - .as_mut() - .unwrap() - .append_key_value_metadata(key_value); - - Ok(()) - } - - fn set_meta_data_path(&mut self, _metadata_path: Option) -> Result<()> { - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - assert!(self.arrow_writer.is_some()); - - let arrow_writer = self.arrow_writer.take().unwrap(); - arrow_writer - .close() - .await - .box_err() - .context(EncodeRecordBatch)?; - - Ok(()) - } -} - pub struct ParquetEncoder { record_encoder: Box, } @@ -526,30 +326,19 @@ impl ParquetEncoder { pub fn try_new( sink: W, schema: &Schema, - hybrid_encoding: bool, num_rows_per_row_group: usize, max_buffer_size: usize, compression: Compression, ) -> Result { - let record_encoder: Box = if hybrid_encoding { - Box::new(HybridRecordEncoder::try_new( + Ok(ParquetEncoder { + record_encoder: Box::new(ColumnarRecordEncoder::try_new( sink, schema, num_rows_per_row_group, max_buffer_size, compression, - )?) - } else { - Box::new(ColumnarRecordEncoder::try_new( - sink, - schema, - num_rows_per_row_group, - max_buffer_size, - compression, - )?) - }; - - Ok(ParquetEncoder { record_encoder }) + )?), + }) } /// Encode the record batch with [ArrowWriter] and the encoded contents is @@ -565,10 +354,6 @@ impl ParquetEncoder { self.record_encoder.encode(arrow_record_batches).await } - pub fn set_meta_data(&mut self, meta_data: ParquetMetaData) -> Result<()> { - self.record_encoder.set_meta_data(meta_data) - } - pub fn set_meta_data_path(&mut self, meta_data_path: Option) -> Result<()> { self.record_encoder.set_meta_data_path(meta_data_path) } @@ -592,254 +377,21 @@ impl RecordDecoder for ColumnarRecordDecoder { } } -struct HybridRecordDecoder { - collapsible_cols_idx: Vec, -} - -impl HybridRecordDecoder { - /// Convert `ListArray` fields to underlying data type - fn convert_schema(arrow_schema: ArrowSchemaRef) -> ArrowSchemaRef { - let new_fields: Vec<_> = arrow_schema - .fields() - .iter() - .map(|f| { - if let DataType::List(nested_field) = f.data_type() { - match f.data_type() { - DataType::Dictionary(_, _) => { - assert!(f.dict_id().is_some(), "Dictionary must have dict_id"); - assert!( - f.dict_is_ordered().is_some(), - "Dictionary must have dict_is_ordered" - ); - let dict_id = f.dict_id().unwrap(); - let dict_is_ordered = f.dict_is_ordered().unwrap(); - Arc::new(Field::new_dict( - f.name(), - nested_field.data_type().clone(), - true, - dict_id, - dict_is_ordered, - )) - } - _ => Arc::new(Field::new(f.name(), nested_field.data_type().clone(), true)), - } - } else { - f.clone() - } - }) - .collect(); - Arc::new(ArrowSchema::new_with_metadata( - new_fields, - arrow_schema.metadata().clone(), - )) - } - - /// Stretch hybrid collapsed column into columnar column. - /// `value_offsets` specify offsets each value occupied, which means that - /// the number of a `value[n]` is `value_offsets[n] - value_offsets[n-1]`. - /// Ex: - /// - /// `array_ref` is `a b c`, `value_offsets` is `[0, 3, 5, 6]`, then - /// output array is `a a a b b c` - /// - /// Note: caller should ensure offsets is not empty. - fn stretch_variable_length_column( - array_ref: &ArrayRef, - value_offsets: &[i32], - ) -> Result { - assert_eq!(array_ref.len() + 1, value_offsets.len()); - - let values_num = *value_offsets.last().unwrap() as usize; - let array_data = array_ref.to_data(); - let offset_slices = array_data.buffers()[0].as_slice(); - let value_slices = array_data.buffers()[1].as_slice(); - let nulls = array_data.nulls(); - trace!( - "raw buffer slice, offsets:{:#02x?}, values:{:#02x?}", - offset_slices, - value_slices, - ); - - let i32_offsets = Self::get_array_offsets(offset_slices); - let mut value_bytes = 0; - for (idx, (current, prev)) in i32_offsets[1..].iter().zip(&i32_offsets).enumerate() { - let value_len = current - prev; - let value_num = value_offsets[idx + 1] - value_offsets[idx]; - value_bytes += value_len * value_num; - } - - // construct new expanded array - let mut new_offsets_buffer = MutableBuffer::new(OFFSET_SIZE * values_num); - let mut new_values_buffer = MutableBuffer::new(value_bytes as usize); - let mut new_null_buffer = hybrid::new_ones_buffer(values_num); - let null_slice = new_null_buffer.as_slice_mut(); - let mut value_length_so_far: i32 = 0; - new_offsets_buffer.push(value_length_so_far); - let mut bitmap_length_so_far: usize = 0; - - for (idx, (current, prev)) in i32_offsets[1..].iter().zip(&i32_offsets).enumerate() { - let value_len = current - prev; - let value_num = value_offsets[idx + 1] - value_offsets[idx]; - - if let Some(nulls) = nulls { - if nulls.is_null(idx) { - for i in 0..value_num { - bit_util::unset_bit(null_slice, bitmap_length_so_far + i as usize); - } - } - } - bitmap_length_so_far += value_num as usize; - new_values_buffer - .extend(value_slices[*prev as usize..*current as usize].repeat(value_num as usize)); - for _ in 0..value_num { - value_length_so_far += value_len; - new_offsets_buffer.push(value_length_so_far); - } - } - trace!( - "new buffer slice, offsets:{:#02x?}, values:{:#02x?}, bitmap:{:#02x?}", - new_offsets_buffer.as_slice(), - new_values_buffer.as_slice(), - new_null_buffer.as_slice(), - ); - - let array_data = ArrayData::builder(array_ref.data_type().clone()) - .len(values_num) - .add_buffer(new_offsets_buffer.into()) - .add_buffer(new_values_buffer.into()) - .null_bit_buffer(Some(new_null_buffer.into())) - .build() - .box_err() - .context(DecodeRecordBatch)?; - - Ok(make_array(array_data)) - } - - /// Like `stretch_variable_length_column`, but array value is fixed-size - /// type. - /// - /// Note: caller should ensure offsets is not empty. - fn stretch_fixed_length_column( - array_ref: &ArrayRef, - value_size: usize, - value_offsets: &[i32], - ) -> Result { - assert!(!value_offsets.is_empty()); - - let values_num = *value_offsets.last().unwrap() as usize; - let array_data = array_ref.to_data(); - let old_values_buffer = array_data.buffers()[0].as_slice(); - let old_nulls = array_data.nulls(); - - let mut new_values_buffer = MutableBuffer::new(value_size * values_num); - let mut new_null_buffer = hybrid::new_ones_buffer(values_num); - let null_slice = new_null_buffer.as_slice_mut(); - let mut length_so_far = 0; - - for (idx, offset) in (0..old_values_buffer.len()).step_by(value_size).enumerate() { - let value_num = (value_offsets[idx + 1] - value_offsets[idx]) as usize; - if let Some(nulls) = old_nulls { - if nulls.is_null(idx) { - for i in 0..value_num { - bit_util::unset_bit(null_slice, length_so_far + i); - } - } - } - length_so_far += value_num; - new_values_buffer - .extend(old_values_buffer[offset..offset + value_size].repeat(value_num)) - } - let array_data = ArrayData::builder(array_ref.data_type().clone()) - .add_buffer(new_values_buffer.into()) - .null_bit_buffer(Some(new_null_buffer.into())) - .len(values_num) - .build() - .box_err() - .context(DecodeRecordBatch)?; - - Ok(make_array(array_data)) - } - - /// Decode offset slices into Vec - fn get_array_offsets(offset_slices: &[u8]) -> Vec { - let mut i32_offsets = Vec::with_capacity(offset_slices.len() / OFFSET_SIZE); - for i in (0..offset_slices.len()).step_by(OFFSET_SIZE) { - let offset = i32::from_le_bytes(offset_slices[i..i + OFFSET_SIZE].try_into().unwrap()); - i32_offsets.push(offset); - } - - i32_offsets - } +pub struct ParquetDecoder { + record_decoder: Box, } -impl RecordDecoder for HybridRecordDecoder { - /// Decode records from hybrid to columnar format - fn decode(&self, arrow_record_batch: ArrowRecordBatch) -> Result { - let new_arrow_schema = Self::convert_schema(arrow_record_batch.schema()); - let arrays = arrow_record_batch.columns(); - - let mut value_offsets = None; - // Find value offsets from the first col in collapsible_cols_idx. - if let Some(idx) = self.collapsible_cols_idx.first() { - let array_data = arrays[*idx as usize].to_data(); - let offset_slices = array_data.buffers()[0].as_slice(); - value_offsets = Some(Self::get_array_offsets(offset_slices)); - } else { - CollapsibleColsIdxEmpty.fail()?; - } - - let value_offsets = value_offsets.unwrap(); - let arrays = arrays - .iter() - .map(|array_ref| { - let data_type = array_ref.data_type(); - match data_type { - // TODO: - // 1. we assume the datatype inside the List is primitive now - // Ensure this when create table - // 2. Although nested structure isn't support now, but may will someday in - // future. So We should keep metadata about which columns - // are collapsed by hybrid storage format, to differentiate - // List column in original records - DataType::List(_nested_field) => { - Ok(make_array(array_ref.to_data().child_data()[0].clone())) - } - _ => { - let datum_kind = DatumKind::from_data_type(data_type).unwrap(); - match datum_kind.size() { - None => Self::stretch_variable_length_column(array_ref, &value_offsets), - Some(value_size) => Self::stretch_fixed_length_column( - array_ref, - value_size, - &value_offsets, - ), - } - } - } - }) - .collect::>>()?; - - ArrowRecordBatch::try_new(new_arrow_schema, arrays) - .box_err() - .context(EncodeRecordBatch) +impl Default for ParquetDecoder { + fn default() -> Self { + Self::new() } } -pub struct ParquetDecoder { - record_decoder: Box, -} - impl ParquetDecoder { - pub fn new(collapsible_cols_idx: &[u32]) -> Self { - let record_decoder: Box = if collapsible_cols_idx.is_empty() { - Box::new(ColumnarRecordDecoder {}) - } else { - Box::new(HybridRecordDecoder { - collapsible_cols_idx: collapsible_cols_idx.to_vec(), - }) - }; - - Self { record_decoder } + pub fn new() -> Self { + Self { + record_decoder: Box::new(ColumnarRecordDecoder {}), + } } pub fn decode_record_batch( @@ -849,356 +401,3 @@ impl ParquetDecoder { self.record_decoder.decode(arrow_record_batch) } } - -#[cfg(test)] -mod tests { - - use std::{pin::Pin, sync::Mutex, task::Poll}; - - use arrow::array::{Int32Array, StringArray, TimestampMillisecondArray, UInt64Array}; - use bytes_ext::Bytes; - use common_types::{ - column_schema, - schema::{Builder, Schema, TSID_COLUMN}, - time::{TimeRange, Timestamp}, - }; - use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; - use pin_project_lite::pin_project; - - use super::*; - - fn build_schema() -> Schema { - Builder::new() - .auto_increment_column_id(true) - .add_key_column( - column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64) - .build() - .unwrap(), - ) - .unwrap() - .add_key_column( - column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp) - .build() - .unwrap(), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("host".to_string(), DatumKind::String) - .is_tag(true) - .build() - .unwrap(), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("region".to_string(), DatumKind::String) - .is_tag(true) - .build() - .unwrap(), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("value".to_string(), DatumKind::Int32) - .build() - .unwrap(), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("string_value".to_string(), DatumKind::String) - .build() - .unwrap(), - ) - .unwrap() - .build() - .unwrap() - } - - fn string_array(values: Vec>) -> ArrayRef { - Arc::new(StringArray::from(values)) - } - - fn int32_array(values: Vec>) -> ArrayRef { - Arc::new(Int32Array::from(values)) - } - - fn timestamp_array(values: Vec) -> ArrayRef { - Arc::new(TimestampMillisecondArray::from(values)) - } - - #[test] - fn stretch_int32_column() { - let testcases = [ - // (input, value_offsets, expected) - ( - vec![Some(1), Some(2)], - vec![0, 2, 4], - vec![Some(1), Some(1), Some(2), Some(2)], - ), - ( - vec![Some(1), None, Some(2)], - vec![0, 2, 4, 5], - vec![Some(1), Some(1), None, None, Some(2)], - ), - ]; - - for (input, value_offsets, expected) in testcases { - let input = int32_array(input); - let expected = int32_array(expected); - let actual = HybridRecordDecoder::stretch_fixed_length_column( - &input, - std::mem::size_of::(), - &value_offsets, - ) - .unwrap(); - assert_eq!( - actual.as_any().downcast_ref::().unwrap(), - expected.as_any().downcast_ref::().unwrap(), - ); - } - } - - #[test] - fn stretch_string_column() { - let testcases = [ - // (input, value_offsets, values_num, expected) - // - // value with same length - ( - vec![Some("a"), Some("b"), Some("c")], - vec![0, 3, 5, 6], - vec![ - Some("a"), - Some("a"), - Some("a"), - Some("b"), - Some("b"), - Some("c"), - ], - ), - // value with different length - ( - vec![Some("hello"), Some("ceresdb")], - vec![0, 1, 3], - vec![Some("hello"), Some("ceresdb"), Some("ceresdb")], - ), - // value with none - ( - vec![None, None, Some("hello"), None], - vec![0, 1, 3, 4, 5], - vec![None, None, None, Some("hello"), None], - ), - ]; - - for (input, value_offsets, expected) in testcases { - let input = string_array(input); - let expected = string_array(expected); - let actual = - HybridRecordDecoder::stretch_variable_length_column(&input, &value_offsets) - .unwrap(); - assert_eq!( - actual.as_any().downcast_ref::().unwrap(), - expected.as_any().downcast_ref::().unwrap(), - ); - } - } - - fn collect_collapsible_cols_idx(schema: &Schema, collapsible_cols_idx: &mut Vec) { - for (idx, _col) in schema.columns().iter().enumerate() { - if schema.is_collapsible_column(idx) { - collapsible_cols_idx.push(idx as u32); - } - } - } - - pin_project! { - struct CopiedBuffer { - #[pin] - buffer: Vec, - copied_buffer: Arc>>, - } - } - - impl CopiedBuffer { - fn new(buffer: Vec) -> Self { - Self { - buffer, - copied_buffer: Arc::new(Mutex::new(Vec::new())), - } - } - - fn copied_buffer(&self) -> Arc>> { - self.copied_buffer.clone() - } - } - - impl AsyncWrite for CopiedBuffer { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - self.copied_buffer.lock().unwrap().extend_from_slice(buf); - let buffer = self.project().buffer; - buffer.poll_write(cx, buf) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - let buffer = self.project().buffer; - buffer.poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - let buffer = self.project().buffer; - buffer.poll_shutdown(cx) - } - } - - #[tokio::test] - async fn hybrid_record_encode_and_decode() { - let schema = build_schema(); - - let meta_data = ParquetMetaData { - min_key: Bytes::from_static(b"100"), - max_key: Bytes::from_static(b"200"), - time_range: TimeRange::new_unchecked(Timestamp::new(100), Timestamp::new(101)), - max_sequence: 200, - schema: schema.clone(), - parquet_filter: Default::default(), - collapsible_cols_idx: Vec::new(), - }; - let copied_buffer = CopiedBuffer::new(Vec::new()); - let copied_encoded_buffer = copied_buffer.copied_buffer(); - let mut encoder = HybridRecordEncoder::try_new( - copied_buffer, - &meta_data.schema, - 100, - 0, - Compression::ZSTD(Default::default()), - ) - .unwrap(); - encoder - .set_meta_data(meta_data.clone()) - .expect("Failed to set meta data"); - - let columns = vec![ - Arc::new(UInt64Array::from(vec![1, 1, 2])) as ArrayRef, - timestamp_array(vec![100, 101, 100]), - string_array(vec![Some("host1"), Some("host1"), Some("host2")]), - string_array(vec![Some("region1"), Some("region1"), Some("region2")]), - int32_array(vec![Some(1), Some(2), Some(11)]), - string_array(vec![ - Some("string_value1"), - Some("string_value2"), - Some("string_value3"), - ]), - ]; - - let columns2 = vec![ - Arc::new(UInt64Array::from(vec![1, 2, 1, 2])) as ArrayRef, - timestamp_array(vec![100, 101, 100, 101]), - string_array(vec![ - Some("host1"), - Some("host2"), - Some("host1"), - Some("host2"), - ]), - string_array(vec![ - Some("region1"), - Some("region2"), - Some("region1"), - Some("region2"), - ]), - int32_array(vec![Some(1), Some(2), Some(11), Some(12)]), - string_array(vec![ - Some("string_value1"), - Some("string_value2"), - Some("string_value3"), - Some("string_value4"), - ]), - ]; - - let input_record_batch = - ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns).unwrap(); - let input_record_batch2 = - ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns2).unwrap(); - let num_rows = encoder - .encode(vec![input_record_batch, input_record_batch2]) - .await - .unwrap(); - assert_eq!(2, num_rows); - - // read encoded records back, and then compare with input records - encoder.close().await.unwrap(); - - let encoded_bytes = copied_encoded_buffer.lock().unwrap().clone(); - let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(encoded_bytes)) - .unwrap() - .build() - .unwrap(); - let hybrid_record_batch = reader.next().unwrap().unwrap(); - let mut collapsible_cols_idx = Vec::new(); - collect_collapsible_cols_idx(&meta_data.schema, &mut collapsible_cols_idx); - - let decoder = HybridRecordDecoder { - collapsible_cols_idx, - }; - let decoded_record_batch = decoder.decode(hybrid_record_batch).unwrap(); - - // Note: decode record batch's schema doesn't have metadata - // It's encoded in metadata of every fields - // assert_eq!(decoded_record_batch.schema(), input_record_batch.schema()); - - let expected_columns = vec![ - Arc::new(UInt64Array::from(vec![1, 1, 1, 1, 2, 2, 2])) as ArrayRef, - timestamp_array(vec![100, 101, 100, 100, 100, 101, 101]), - string_array(vec![ - Some("host1"), - Some("host1"), - Some("host1"), - Some("host1"), - Some("host2"), - Some("host2"), - Some("host2"), - ]), - string_array(vec![ - Some("region1"), - Some("region1"), - Some("region1"), - Some("region1"), - Some("region2"), - Some("region2"), - Some("region2"), - ]), - int32_array(vec![ - Some(1), - Some(2), - Some(1), - Some(11), - Some(11), - Some(2), - Some(12), - ]), - string_array(vec![ - Some("string_value1"), - Some("string_value2"), - Some("string_value1"), - Some("string_value3"), - Some("string_value3"), - Some("string_value2"), - Some("string_value4"), - ]), - ]; - - let expect_record_batch = - ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), expected_columns).unwrap(); - assert_eq!( - decoded_record_batch.columns(), - expect_record_batch.columns() - ); - } -} diff --git a/analytic_engine/src/sst/parquet/hybrid.rs b/analytic_engine/src/sst/parquet/hybrid.rs deleted file mode 100644 index 12630501ac..0000000000 --- a/analytic_engine/src/sst/parquet/hybrid.rs +++ /dev/null @@ -1,776 +0,0 @@ -// Copyright 2023 The CeresDB Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{collections::BTreeMap, sync::Arc}; - -use arrow::{ - array::{ - Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray, ListArray, StringArray, - UInt64Array, - }, - buffer::{MutableBuffer, NullBuffer}, - datatypes::Schema as ArrowSchema, - record_batch::RecordBatch as ArrowRecordBatch, - util::bit_util, -}; -use common_types::{ - datum::DatumKind, - schema::{ArrowSchemaRef, DataType, Field, Schema}, -}; -use generic_error::BoxError; -use snafu::{Backtrace, ResultExt, Snafu}; - -use crate::sst::writer::{EncodeRecordBatch, Result}; - -// hard coded in https://github.com/apache/arrow-rs/blob/20.0.0/arrow/src/array/array_list.rs#L185 -const LIST_ITEM_NAME: &str = "item"; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display( - "Hybrid format only support variable length types UTF8 and Binary, current type:{:?}.\nBacktrace:\n{}", - type_name, - backtrace - ))] - VariableLengthType { - type_name: DataType, - backtrace: Backtrace, - }, -} - -#[derive(Debug, Clone, Copy)] -struct SliceArg { - offset: usize, - length: usize, -} - -/// ArrayHandle is used to keep different offsets of array, which can be -/// concatenated together. -/// -/// Note: -/// 1. Array.slice(offset, length) don't work as expected, since the -/// underlying buffer is still shared without slice. -/// 2. Array should be [fixed-size primitive](https://arrow.apache.org/docs/format/Columnar.html#fixed-size-primitive-layout) -#[derive(Debug, Clone)] -struct ArrayHandle { - array: ArrayRef, - slice_args: Vec, -} - -impl ArrayHandle { - fn new(array: ArrayRef) -> Self { - Self::with_slice_args(array, Vec::new()) - } - - fn with_slice_args(array: ArrayRef, slice_args: Vec) -> Self { - Self { array, slice_args } - } - - fn append_slice_arg(&mut self, arg: SliceArg) { - self.slice_args.push(arg) - } - - fn len(&self) -> usize { - self.slice_args.iter().map(|arg| arg.length).sum() - } - - // Note: this require primitive array - fn data_slice(&self) -> Vec { - self.array.to_data().buffers()[0].as_slice().to_vec() - } - - fn nulls(&self) -> Option<&NullBuffer> { - self.array.nulls() - } -} - -/// `TsidBatch` is used to collect column data for the same TSID -#[derive(Debug)] -struct TsidBatch { - non_collapsible_col_values: Vec, - // record_batch_idx -> ArrayHandle - // Store collapsible data in multi record batch. - // Vec contains multi columns data. - collapsible_col_arrays: BTreeMap>, -} - -impl TsidBatch { - fn new(non_collapsible_col_values: Vec) -> Self { - Self { - non_collapsible_col_values, - collapsible_col_arrays: BTreeMap::new(), - } - } -} - -#[derive(Debug)] -pub struct IndexedType { - pub idx: usize, - pub data_type: DatumKind, -} - -struct IndexedArray { - idx: usize, - array: ArrayRef, -} - -/// Convert collapsible columns to list type -pub fn build_hybrid_arrow_schema(schema: &Schema) -> ArrowSchemaRef { - let arrow_schema = schema.to_arrow_schema_ref(); - let new_fields = arrow_schema - .fields() - .iter() - .enumerate() - .map(|(idx, field)| { - if schema.is_collapsible_column(idx) { - let field_type = DataType::List(Arc::new(Field::new( - LIST_ITEM_NAME, - field.data_type().clone(), - true, - ))); - // TODO(tanruixiang): is there need to use new_dict? - Arc::new(Field::new(field.name(), field_type, true)) - } else { - field.clone() - } - }) - .collect::>(); - Arc::new(ArrowSchema::new_with_metadata( - new_fields, - arrow_schema.metadata().clone(), - )) -} - -struct StringArrayWrapper<'a>(&'a StringArray); -struct BinaryArrayWrapper<'a>(&'a BinaryArray); - -/// VariableSizeArray is a trait of variable-size array, such as StringArray and -/// BinaryArray. Dealing with the buffer data. -/// -/// There is no common trait for variable-size array to dealing with the buffer -/// data in arrow-rs library. -trait VariableSizeArray { - // Returns the offset values in the offsets buffer. - fn value_offsets(&self) -> &[i32]; - // Returns the length for the element at index i. - fn value_length(&self, index: usize) -> i32; - // Returns a clone of the value data buffer. - fn value_data(&self) -> &[u8]; -} - -macro_rules! impl_offsets { - ($array: ty) => { - impl<'a> VariableSizeArray for $array { - fn value_offsets(&self) -> &[i32] { - self.0.value_offsets() - } - - fn value_length(&self, index: usize) -> i32 { - self.0.value_length(index) - } - - fn value_data(&self) -> &[u8] { - self.0.value_data() - } - } - }; -} - -impl_offsets!(StringArrayWrapper<'a>); -impl_offsets!(BinaryArrayWrapper<'a>); - -/// ListArrayBuilder is used for concat slice of different Arrays represented by -/// ArrayHandle into one ListArray -struct ListArrayBuilder { - datum_kind: DatumKind, - // Vec of row - multi_row_arrays: Vec>, -} - -impl ListArrayBuilder { - fn new(datum_kind: DatumKind, multi_row_arrays: Vec>) -> Self { - Self { - datum_kind, - multi_row_arrays, - } - } - - fn build_child_data(&self, offsets: &mut MutableBuffer) -> Result { - // Num of raw data in child data. - let values_num = self - .multi_row_arrays - .iter() - .map(|handles| handles.iter().map(|h| h.len()).sum::()) - .sum(); - - // Initialize null_buffer with all 1, so we don't need to set it when array's - // null_bitmap is None - // - // Note: bit set to 1 means value is not null. - let mut null_buffer = new_ones_buffer(values_num); - let null_slice = null_buffer.as_slice_mut(); - - let mut length_so_far: i32 = 0; - for arrays in &self.multi_row_arrays { - for array_handle in arrays { - let nulls = array_handle.nulls(); - - for slice_arg in &array_handle.slice_args { - let offset = slice_arg.offset; - let length = slice_arg.length; - if let Some(nulls) = nulls { - // TODO: We now set bitmap one by one, a more complicated but efficient way - // is to operate on bitmap buffer bits directly, - // like what we do with values(slice and shift) - for i in 0..length { - if nulls.is_null(i + offset) { - bit_util::unset_bit(null_slice, length_so_far as usize + i); - } - } - } - length_so_far += length as i32; - } - } - } - - let mut builder = ArrayData::builder(self.datum_kind.to_arrow_data_type()) - .len(values_num) - .null_bit_buffer(Some(null_buffer.into())); - - builder = self.apply_child_data_buffer(builder, offsets)?; - let values_array_data = builder.build().box_err().context(EncodeRecordBatch)?; - - Ok(values_array_data) - } - - fn apply_child_data_buffer( - &self, - mut builder: ArrayDataBuilder, - offsets: &mut MutableBuffer, - ) -> Result { - let (inner_offsets, values) = if let Some(data_type_size) = self.datum_kind.size() { - ( - None, - self.build_fixed_size_array_buffer(offsets, data_type_size), - ) - } else { - let (inner_offsets, values) = self.build_variable_size_array_buffer(offsets)?; - (Some(inner_offsets), values) - }; - - if let Some(buffer) = inner_offsets { - builder = builder.add_buffer(buffer.into()); - } - builder = builder.add_buffer(values.into()); - Ok(builder) - } - - fn build_fixed_size_array_buffer( - &self, - offsets: &mut MutableBuffer, - data_type_size: usize, - ) -> MutableBuffer { - let mut length_so_far: i32 = 0; - offsets.push(length_so_far); - - let values_num: usize = self - .multi_row_arrays - .iter() - .map(|handles| handles.iter().map(|handle| handle.len()).sum::()) - .sum(); - let mut values = MutableBuffer::new(values_num * data_type_size); - for arrays in &self.multi_row_arrays { - for array_handle in arrays { - let shared_buffer = array_handle.data_slice(); - for slice_arg in &array_handle.slice_args { - let offset = slice_arg.offset; - let length = slice_arg.length; - length_so_far += length as i32; - - values.extend_from_slice( - &shared_buffer[offset * data_type_size..(offset + length) * data_type_size], - ); - } - } - // The data in the arrays belong to the same tsid, so the offsets is the total - // len. - offsets.push(length_so_far); - } - - values - } - - /// Return (offsets_buffer, values_buffer) according to arrow - /// `Variable-size Binary Layout`. Refer to https://arrow.apache.org/docs/format/Columnar.html#variable-size-binary-layout. - fn build_variable_size_array_buffer( - &self, - offsets: &mut MutableBuffer, - ) -> Result<(MutableBuffer, MutableBuffer)> { - let mut length_so_far: i32 = 0; - offsets.push(length_so_far); - - let (offsets_length_total, values_length_total) = - self.compute_variable_size_array_buffer_length()?; - - let mut inner_values = MutableBuffer::new(values_length_total); - let mut inner_offsets = MutableBuffer::new(offsets_length_total); - - self.build_variable_size_array_buffer_data( - &mut length_so_far, - offsets, - &mut inner_offsets, - &mut inner_values, - )?; - Ok((inner_offsets, inner_values)) - } - - fn convert_to_variable_size_array<'a>( - &self, - array_handle: &'a ArrayHandle, - ) -> Result> { - match self.datum_kind.to_arrow_data_type() { - DataType::Utf8 => Ok(Box::new(StringArrayWrapper( - array_handle - .array - .as_any() - .downcast_ref::() - .expect("downcast StringArray failed"), - ))), - DataType::Binary => Ok(Box::new(BinaryArrayWrapper( - array_handle - .array - .as_any() - .downcast_ref::() - .expect("downcast BinaryArray failed"), - ))), - typ => VariableLengthType { type_name: typ } - .fail() - .box_err() - .context(EncodeRecordBatch), - } - } - - /// Return (offsets_length_total, values_length_total). - #[inline] - fn compute_variable_size_array_buffer_length(&self) -> Result<(usize, usize)> { - let mut offsets_length_total = 0; - let mut values_length_total = 0; - - for multi_row_array in &self.multi_row_arrays { - for array_handle in multi_row_array { - let array = self.convert_to_variable_size_array(array_handle)?; - for slice_arg in &array_handle.slice_args { - let start = array.value_offsets()[slice_arg.offset]; - let end = array.value_offsets()[slice_arg.offset + slice_arg.length]; - - offsets_length_total += slice_arg.length; - values_length_total += (end - start) as usize; - } - } - } - - Ok((offsets_length_total, values_length_total)) - } - - /// Build variable-size array buffer data. - /// - /// length_so_far and offsets are used for father buffer data. - /// inner_offsets and inner_values are used for child buffer data. - fn build_variable_size_array_buffer_data( - &self, - length_so_far: &mut i32, - offsets: &mut MutableBuffer, - inner_offsets: &mut MutableBuffer, - inner_values: &mut MutableBuffer, - ) -> Result<()> { - let mut inner_length_so_far: i32 = 0; - inner_offsets.push(inner_length_so_far); - - for arrays in &self.multi_row_arrays { - for array_handle in arrays { - let array = self.convert_to_variable_size_array(array_handle)?; - - for slice_arg in &array_handle.slice_args { - *length_so_far += slice_arg.length as i32; - - let start = array.value_offsets()[slice_arg.offset]; - let end = array.value_offsets()[slice_arg.offset + slice_arg.length]; - - for i in slice_arg.offset..(slice_arg.offset + slice_arg.length) { - inner_length_so_far += array.value_length(i); - inner_offsets.push(inner_length_so_far); - } - - inner_values - .extend_from_slice(&array.value_data()[start as usize..end as usize]); - } - } - // The data in the arrays belong to the same tsid, so the offsets is the total - // len. - offsets.push(*length_so_far); - } - - Ok(()) - } - - /// This function is a translation of [GenericListArray.from_iter_primitive](https://docs.rs/arrow/20.0.0/src/arrow/array/array_list.rs.html#151) - fn build(self) -> Result { - // The data in multi_row_arrays belong to different tsids. - // So the values num is the len of multi_row_arrays. - let array_len = self.multi_row_arrays.len(); - let mut offsets = MutableBuffer::new(array_len * std::mem::size_of::()); - let child_data = self.build_child_data(&mut offsets)?; - // TODO(tanruixiang): is there need to use new_dict? - let field = Arc::new(Field::new( - LIST_ITEM_NAME, - self.datum_kind.to_arrow_data_type(), - true, - )); - let array_data = ArrayData::builder(DataType::List(field)) - .len(array_len) - .add_buffer(offsets.into()) - .add_child_data(child_data); - - // TODO: change to unsafe version? - // https://docs.rs/arrow/20.0.0/src/arrow/array/array_list.rs.html#192 - // let array_data = unsafe { array_data.build_unchecked() }; - let array_data = array_data.build().box_err().context(EncodeRecordBatch)?; - - Ok(ListArray::from(array_data)) - } -} - -/// Builds hybrid record by concat timestamp and non key columns into -/// `ListArray`. -fn build_hybrid_record( - arrow_schema: ArrowSchemaRef, - tsid_type: &IndexedType, - non_collapsible_col_types: &[IndexedType], - collapsible_col_types: &[IndexedType], - // tsid -> TsidBatch - batch_by_tsid: BTreeMap, -) -> Result { - let tsid_array = UInt64Array::from_iter_values(batch_by_tsid.keys().cloned()); - - // col_idx -> tsid -> data array - let mut collapsible_col_arrays = - vec![vec![Vec::new(); tsid_array.len()]; collapsible_col_types.len()]; - let mut non_collapsible_col_arrays = vec![Vec::new(); non_collapsible_col_types.len()]; - - // Reorganize data in batch_by_tsid. - // tsid-> col_idx-> data array ==> col_idx -> tsid -> data array - // example: - // tsid0 -> vec![ data_arrays0 of col0, data_array1 of col1] - // tsid1 -> vec![ data_arrays2 of col0, data_array3 of col1] - // ==> - // col0 -> vec![ data_arrays0 of tsid0, data_array2 of tsid1] - // col1 -> vec![ data_arrays1 of tsid0, data_array3 of tsid1] - for (tsid_idx, batch) in batch_by_tsid.into_values().enumerate() { - for col_array in batch.collapsible_col_arrays.into_values() { - for (col_idx, arr) in col_array.into_iter().enumerate() { - collapsible_col_arrays[col_idx][tsid_idx].push(arr); - } - } - for (col_idx, arr) in batch.non_collapsible_col_values.into_iter().enumerate() { - non_collapsible_col_arrays[col_idx].push(arr); - } - } - let tsid_array = IndexedArray { - idx: tsid_type.idx, - array: Arc::new(tsid_array), - }; - let non_collapsible_col_arrays = non_collapsible_col_arrays - .into_iter() - .zip(non_collapsible_col_types.iter().map(|n| n.idx)) - .map(|(c, idx)| IndexedArray { - idx, - array: Arc::new(StringArray::from(c)) as ArrayRef, - }) - .collect::>(); - let collapsible_col_arrays = collapsible_col_arrays - .into_iter() - .zip(collapsible_col_types.iter().map(|n| (n.idx, n.data_type))) - .map(|(handle, (idx, datum_type))| { - Ok(IndexedArray { - idx, - array: Arc::new(ListArrayBuilder::new(datum_type, handle).build()?), - }) - }) - .collect::>>()?; - - let all_columns = [ - vec![tsid_array], - non_collapsible_col_arrays, - collapsible_col_arrays, - ] - .into_iter() - .flatten() - .map(|indexed_array| (indexed_array.idx, indexed_array.array)) - .collect::>() - .into_values() - .collect::>(); - - ArrowRecordBatch::try_new(arrow_schema, all_columns) - .box_err() - .context(EncodeRecordBatch) -} - -/// Converts arrow record batch into hybrid record format describe in -/// `StorageFormat::Hybrid` -pub fn convert_to_hybrid_record( - tsid_type: &IndexedType, - non_collapsible_col_types: &[IndexedType], - collapsible_col_types: &[IndexedType], - hybrid_arrow_schema: ArrowSchemaRef, - arrow_record_batches: Vec, -) -> Result { - // TODO: should keep tsid ordering here? - let mut batch_by_tsid = BTreeMap::new(); - for (record_idx, record_batch) in arrow_record_batches.iter().enumerate() { - let tsid_array = record_batch - .column(tsid_type.idx) - .as_any() - .downcast_ref::() - .expect("checked when create table"); - - if tsid_array.is_empty() { - continue; - } - - let non_collapsible_col_values = non_collapsible_col_types - .iter() - .map(|col| { - record_batch - .column(col.idx) - .as_any() - .downcast_ref::() - .expect("checked in HybridRecordEncoder::try_new") - }) - .collect::>(); - let mut previous_tsid = tsid_array.value(0); - // duplicated_tsids is an array of every tsid's offset in origin array - // the length of each tsid can be calculated with - // tsid_n = duplicated_tsids[n+1].offset - duplicated_tsids[n].offset - let mut duplicated_tsids = vec![(previous_tsid, 0)]; // (tsid, offset) - for row_idx in 1..tsid_array.len() { - let tsid = tsid_array.value(row_idx); - if tsid != previous_tsid { - previous_tsid = tsid; - duplicated_tsids.push((tsid, row_idx)); - } - } - for i in 0..duplicated_tsids.len() { - let (tsid, offset) = duplicated_tsids[i]; - let length = if i == duplicated_tsids.len() - 1 { - tsid_array.len() - offset - } else { - duplicated_tsids[i + 1].1 - offset - }; - - let batch = batch_by_tsid.entry(tsid).or_insert_with(|| { - TsidBatch::new( - non_collapsible_col_values - .iter() - .map(|col| col.value(offset).to_string()) - .collect(), - ) - }); - let collapsible_col_arrays = batch - .collapsible_col_arrays - .entry(record_idx) - .or_insert_with(|| { - collapsible_col_types - .iter() - .map(|col| ArrayHandle::new(record_batch.column(col.idx).clone())) - .collect() - }); - // Append the slice arg to all columns in the same record batch. - for handle in collapsible_col_arrays { - handle.append_slice_arg(SliceArg { offset, length }); - } - } - } - build_hybrid_record( - hybrid_arrow_schema, - tsid_type, - non_collapsible_col_types, - collapsible_col_types, - batch_by_tsid, - ) -} - -/// Return a MutableBuffer with bits all set to 1 -pub fn new_ones_buffer(len: usize) -> MutableBuffer { - let null_buffer = MutableBuffer::new_null(len); - let buf_cap = null_buffer.capacity(); - null_buffer.with_bitset(buf_cap, true) -} - -#[cfg(test)] -mod tests { - use arrow::{ - array::{TimestampMillisecondArray, UInt16Array}, - buffer::Buffer, - datatypes::{TimestampMillisecondType, UInt16Type}, - }; - - use super::*; - - impl From<(usize, usize)> for SliceArg { - fn from(offset_length: (usize, usize)) -> Self { - Self { - offset: offset_length.0, - length: offset_length.1, - } - } - } - - fn timestamp_array(start: i64, end: i64) -> ArrayRef { - Arc::new(TimestampMillisecondArray::from_iter_values(start..end)) - } - - fn uint16_array(values: Vec>) -> ArrayRef { - let arr: UInt16Array = values.into_iter().collect(); - - Arc::new(arr) - } - - fn string_array(values: Vec>) -> ArrayRef { - let arr: StringArray = values.into_iter().collect(); - - Arc::new(arr) - } - - #[test] - fn merge_timestamp_array_to_list() { - let row0 = vec![ArrayHandle::with_slice_args( - timestamp_array(1, 20), - vec![(1, 2).into(), (10, 3).into()], - )]; - let row1 = vec![ArrayHandle::with_slice_args( - timestamp_array(100, 120), - vec![(1, 2).into(), (10, 3).into()], - )]; - - let data = vec![ - Some(vec![Some(2), Some(3), Some(11), Some(12), Some(13)]), - Some(vec![Some(101), Some(102), Some(110), Some(111), Some(112)]), - ]; - let expected = ListArray::from_iter_primitive::(data); - let list_array = ListArrayBuilder::new(DatumKind::Timestamp, vec![row0, row1]) - .build() - .unwrap(); - - assert_eq!(list_array, expected); - } - - #[test] - fn merge_u16_array_with_none_to_list() { - let row0 = vec![ArrayHandle::with_slice_args( - uint16_array(vec![ - Some(1), - Some(2), - None, - Some(3), - Some(4), - Some(5), - Some(6), - ]), - vec![(1, 3).into(), (4, 1).into()], - )]; - let row1 = vec![ArrayHandle::with_slice_args( - uint16_array(vec![ - Some(1), - Some(2), - None, - Some(3), - Some(4), - Some(5), - Some(6), - ]), - vec![(0, 1).into()], - )]; - - let data = vec![ - Some(vec![Some(2), None, Some(3), Some(4)]), - Some(vec![Some(1)]), - ]; - let expected = ListArray::from_iter_primitive::(data); - let list_array = ListArrayBuilder::new(DatumKind::UInt16, vec![row0, row1]) - .build() - .unwrap(); - - assert_eq!(list_array, expected); - } - - #[test] - fn merge_string_array_with_none_to_list() { - let row0 = vec![ArrayHandle::with_slice_args( - string_array(vec![ - Some("a"), - Some("bb"), - None, - Some("ccc"), - Some("d"), - Some("eeee"), - Some("eee"), - ]), - vec![(1, 3).into(), (5, 1).into()], - )]; - - let row1 = vec![ArrayHandle::with_slice_args( - string_array(vec![ - Some("a"), - Some("bb"), - None, - Some("ccc"), - Some("d"), - Some("eeee"), - Some("eee"), - ]), - vec![(0, 1).into()], - )]; - - let string_data = - string_array(vec![Some("bb"), None, Some("ccc"), Some("eeee"), Some("a")]); - let offsets: [i32; 3] = [0, 4, 5]; - let array_data = ArrayData::builder(DataType::List(Arc::new(Field::new( - LIST_ITEM_NAME, - DataType::Utf8, - true, - )))) - .len(2) - .add_buffer(Buffer::from_slice_ref(offsets)) - .add_child_data(string_data.to_data()) - .build() - .unwrap(); - let expected = ListArray::from(array_data); - let list_array = ListArrayBuilder::new(DatumKind::String, vec![row0, row1]) - .build() - .unwrap(); - - assert_eq!(list_array, expected); - } - - // Fix https://github.com/CeresDB/ceresdb/issues/255 - // null buffer will ceiled by 8, so its capacity may less than `size` - #[test] - fn new_null_buffer_with_different_size() { - let sizes = [1, 8, 11, 20, 511]; - - for size in &sizes { - let _ = new_ones_buffer(*size); - } - } -} diff --git a/analytic_engine/src/sst/parquet/meta_data.rs b/analytic_engine/src/sst/parquet/meta_data.rs index 2396ae2639..c7a7f072a9 100644 --- a/analytic_engine/src/sst/parquet/meta_data.rs +++ b/analytic_engine/src/sst/parquet/meta_data.rs @@ -348,7 +348,6 @@ pub struct ParquetMetaData { pub max_sequence: SequenceNumber, pub schema: Schema, pub parquet_filter: Option, - pub collapsible_cols_idx: Vec, } pub type ParquetMetaDataRef = Arc; @@ -362,7 +361,6 @@ impl From for ParquetMetaData { max_sequence: meta.max_sequence, schema: meta.schema, parquet_filter: None, - collapsible_cols_idx: Vec::new(), } } } @@ -407,7 +405,6 @@ impl fmt::Debug for ParquetMetaData { .map(|filter| filter.size()) .unwrap_or(0), ) - .field("collapsible_cols_idx", &self.collapsible_cols_idx) .finish() } } @@ -421,7 +418,8 @@ impl From for sst_pb::ParquetMetaData { time_range: Some(src.time_range.into()), schema: Some(schema_pb::TableSchema::from(&src.schema)), filter: src.parquet_filter.map(|v| v.into()), - collapsible_cols_idx: src.collapsible_cols_idx, + // collapsible_cols_idx is used in hybrid format ,and it's deprecated. + collapsible_cols_idx: Vec::new(), } } } @@ -447,7 +445,6 @@ impl TryFrom for ParquetMetaData { max_sequence: src.max_sequence, schema, parquet_filter, - collapsible_cols_idx: src.collapsible_cols_idx, }) } } diff --git a/analytic_engine/src/sst/parquet/mod.rs b/analytic_engine/src/sst/parquet/mod.rs index c7ee9d11c8..617ce7394e 100644 --- a/analytic_engine/src/sst/parquet/mod.rs +++ b/analytic_engine/src/sst/parquet/mod.rs @@ -16,7 +16,6 @@ pub mod async_reader; pub mod encoding; -mod hybrid; pub mod meta_data; mod row_group_pruner; pub mod writer; diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index 1227fce2e6..9e1631e0b6 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -31,7 +31,7 @@ use crate::{ factory::{ObjectStorePickerRef, SstWriteOptions}, file::Level, parquet::{ - encoding::{encode_sst_meta_data_v2, ParquetEncoder}, + encoding::{encode_sst_meta_data, ParquetEncoder}, meta_data::{ParquetFilter, ParquetMetaData, RowGroupFilterBuilder}, }, writer::{ @@ -49,7 +49,6 @@ pub struct ParquetSstWriter<'a> { /// The path where the data is persisted. path: &'a Path, level: Level, - hybrid_encoding: bool, /// The storage where the data is persist. store: &'a ObjectStoreRef, /// Max row group size. @@ -62,7 +61,6 @@ impl<'a> ParquetSstWriter<'a> { pub fn new( path: &'a Path, level: Level, - hybrid_encoding: bool, store_picker: &'a ObjectStorePickerRef, options: &SstWriteOptions, ) -> Self { @@ -70,7 +68,6 @@ impl<'a> ParquetSstWriter<'a> { Self { path, level, - hybrid_encoding, store, num_rows_per_row_group: options.num_rows_per_row_group, compression: options.compression.into(), @@ -83,7 +80,6 @@ impl<'a> ParquetSstWriter<'a> { /// encode them to parquet file. struct RecordBatchGroupWriter { request_id: RequestId, - hybrid_encoding: bool, input: RecordBatchStream, input_exhausted: bool, meta_data: MetaData, @@ -178,8 +174,7 @@ impl RecordBatchGroupWriter { } fn need_custom_filter(&self) -> bool { - // TODO: support filter in hybrid storage format [#435](https://github.com/CeresDB/ceresdb/issues/435) - !self.hybrid_encoding && !self.level.is_min() + !self.level.is_min() } async fn write_all( @@ -194,7 +189,6 @@ impl RecordBatchGroupWriter { let mut parquet_encoder = ParquetEncoder::try_new( sink, &self.meta_data.schema, - self.hybrid_encoding, self.num_rows_per_row_group, self.max_buffer_size, self.compression, @@ -296,7 +290,7 @@ async fn write_metadata( where W: AsyncWrite + Send + Unpin, { - let buf = encode_sst_meta_data_v2(parquet_metadata).context(EncodePbData)?; + let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?; meta_sink .write_all(buf.as_bytes()) .await @@ -333,7 +327,6 @@ impl<'a> SstWriter for ParquetSstWriter<'a> { ); let group_writer = RecordBatchGroupWriter { - hybrid_encoding: self.hybrid_encoding, request_id, input, input_exhausted: false, @@ -370,15 +363,10 @@ impl<'a> SstWriter for ParquetSstWriter<'a> { } let file_head = self.store.head(self.path).await.context(Storage)?; - let storage_format = if self.hybrid_encoding { - StorageFormat::Hybrid - } else { - StorageFormat::Columnar - }; Ok(SstInfo { file_size: file_head.size, row_num: total_num_rows, - storage_format, + storage_format: StorageFormat::Columnar, meta_path: meta_path.to_string(), }) } @@ -665,7 +653,6 @@ mod tests { let mut group_writer = RecordBatchGroupWriter { request_id: RequestId::next_id(), - hybrid_encoding: false, input: record_batch_stream, input_exhausted: false, num_rows_per_row_group, diff --git a/analytic_engine/src/table/version_edit.rs b/analytic_engine/src/table/version_edit.rs index 41e5230831..0523bbb12c 100644 --- a/analytic_engine/src/table/version_edit.rs +++ b/analytic_engine/src/table/version_edit.rs @@ -45,6 +45,9 @@ pub enum Error { #[snafu(display("Fail to convert table schema, err:{}", source))] ConvertTableSchema { source: common_types::schema::Error }, + #[snafu(display("Fail to convert storage format, err:{}", source))] + ConvertStorageFormat { source: crate::table_options::Error }, + #[snafu(display("Time range is not found.\nBacktrace:\n{}", backtrace))] TimeRangeNotFound { backtrace: Backtrace }, @@ -97,7 +100,8 @@ impl TryFrom for AddFile { row_num: src.row_num, time_range, max_seq: src.max_seq, - storage_format: StorageFormat::from(storage_format), + storage_format: StorageFormat::try_from(storage_format) + .context(ConvertStorageFormat)?, associated_files: src.associated_files, }, }; diff --git a/analytic_engine/src/table_options.rs b/analytic_engine/src/table_options.rs index c1ca841507..86d7d8d9e8 100644 --- a/analytic_engine/src/table_options.rs +++ b/analytic_engine/src/table_options.rs @@ -41,7 +41,6 @@ const COMPRESSION_SNAPPY: &str = "SNAPPY"; const COMPRESSION_ZSTD: &str = "ZSTD"; const STORAGE_FORMAT_AUTO: &str = "AUTO"; const STORAGE_FORMAT_COLUMNAR: &str = "COLUMNAR"; -const STORAGE_FORMAT_HYBRID: &str = "HYBRID"; /// Default bucket duration (1d) const BUCKET_DURATION_1D: Duration = Duration::from_secs(24 * 60 * 60); @@ -126,6 +125,12 @@ pub enum Error { #[snafu(display("Storage format hint is missing.\nBacktrace:\n{}", backtrace))] MissingStorageFormatHint { backtrace: Backtrace }, + + #[snafu(display( + "Hybrid format is deprecated, and cannot be used any more.\nBacktrace:\n{}", + backtrace + ))] + HybridDeprecated { backtrace: Backtrace }, } define_result!(Error); @@ -253,25 +258,6 @@ pub enum StorageFormat { /// | ..... | | | | | /// ``` Columnar, - - /// Design for time-series data - /// Collapsible Columns within same primary key are collapsed - /// into list, other columns are the same format with columnar's. - /// - /// Whether a column is collapsible is decided by - /// `Schema::is_collapsible_column` - /// - /// Note: minTime/maxTime is optional and not implemented yet, mainly used - /// for time-range pushdown filter - /// - ///```plaintext - /// | Device ID | Timestamp | Status Code | Tag 1 | Tag 2 | minTime | maxTime | - /// |-----------|---------------------|-------------|-------|-------|---------|---------| - /// | A | [12:01,12:02,12:03] | [0,0,0] | v1 | v1 | 12:01 | 12:03 | - /// | B | [12:01,12:02,12:03] | [0,1,0] | v2 | v2 | 12:01 | 12:03 | - /// | ... | | | | | | | - /// ``` - Hybrid, } impl From for manifest_pb::StorageFormatHint { @@ -301,7 +287,7 @@ impl TryFrom for StorageFormatHint { manifest_pb::storage_format_hint::Hint::Specific(format) => { let storage_format = manifest_pb::StorageFormat::from_i32(format) .context(UnknownStorageFormatType { value: format })?; - StorageFormatHint::Specific(storage_format.into()) + StorageFormatHint::Specific(storage_format.try_into()?) } }; @@ -324,7 +310,6 @@ impl TryFrom<&str> for StorageFormatHint { fn try_from(value: &str) -> Result { let format = match value.to_uppercase().as_str() { STORAGE_FORMAT_COLUMNAR => Self::Specific(StorageFormat::Columnar), - STORAGE_FORMAT_HYBRID => Self::Specific(StorageFormat::Hybrid), STORAGE_FORMAT_AUTO => Self::Auto, _ => return UnknownStorageFormatHint { value }.fail(), }; @@ -336,16 +321,17 @@ impl From for manifest_pb::StorageFormat { fn from(format: StorageFormat) -> Self { match format { StorageFormat::Columnar => Self::Columnar, - StorageFormat::Hybrid => Self::Hybrid, } } } -impl From for StorageFormat { - fn from(format: manifest_pb::StorageFormat) -> Self { +impl TryFrom for StorageFormat { + type Error = Error; + + fn try_from(format: manifest_pb::StorageFormat) -> Result { match format { - manifest_pb::StorageFormat::Columnar => Self::Columnar, - manifest_pb::StorageFormat::Hybrid => Self::Hybrid, + manifest_pb::StorageFormat::Columnar => Ok(Self::Columnar), + manifest_pb::StorageFormat::Hybrid => HybridDeprecated {}.fail(), } } } @@ -356,7 +342,6 @@ impl TryFrom<&str> for StorageFormat { fn try_from(value: &str) -> Result { let format = match value.to_uppercase().as_str() { STORAGE_FORMAT_COLUMNAR => Self::Columnar, - STORAGE_FORMAT_HYBRID => Self::Hybrid, _ => return UnknownStorageFormat { value }.fail(), }; Ok(format) @@ -367,7 +352,6 @@ impl ToString for StorageFormat { fn to_string(&self) -> String { match self { Self::Columnar => STORAGE_FORMAT_COLUMNAR, - Self::Hybrid => STORAGE_FORMAT_HYBRID, } .to_string() } diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 9e219483c3..6bed586141 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -781,21 +781,6 @@ impl Schema { self.column(i).is_tag } - /// Whether i-nth column can be collapsed to List describe in - /// `StorageFormat::Hybrid` - pub fn is_collapsible_column(&self, i: usize) -> bool { - if self.timestamp_index == i { - return true; - } - - if self.is_tag_column(i) { - return false; - } - - self.tsid_index - .map_or_else(|| true, |tsid_idx| tsid_idx != i) - } - /// Get the version of this schema #[inline] pub fn version(&self) -> Version { diff --git a/integration_tests/cases/env/cluster/ddl/create_tables.result b/integration_tests/cases/env/cluster/ddl/create_tables.result index 183be7de8b..b1258f8670 100644 --- a/integration_tests/cases/env/cluster/ddl/create_tables.result +++ b/integration_tests/cases/env/cluster/ddl/create_tables.result @@ -167,34 +167,6 @@ drop table `05_create_tables_t8`; affected_rows: 0 -CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'hybrid'); - -affected_rows: 0 - -show create table `05_create_tables_t8`; - -Table,Create Table, -String("05_create_tables_t8"),String("CREATE TABLE `05_create_tables_t8` (`tsid` uint64 NOT NULL, `t1` timestamp NOT NULL, `c1` int, PRIMARY KEY(tsid,t1), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='HYBRID', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"), - - -drop table `05_create_tables_t8`; - -affected_rows: 0 - -CREATE TABLE `05_create_tables_t9`(c1 int, d string dictionary, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'hybrid'); - -affected_rows: 0 - -show create table `05_create_tables_t9`; - -Table,Create Table, -String("05_create_tables_t9"),String("CREATE TABLE `05_create_tables_t9` (`tsid` uint64 NOT NULL, `t1` timestamp NOT NULL, `c1` int, `d` string DICTIONARY, PRIMARY KEY(tsid,t1), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='HYBRID', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"), - - -drop table `05_create_tables_t9`; - -affected_rows: 0 - CREATE TABLE `05_create_tables_t9`(c1 int, d string dictionary, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'columnar'); affected_rows: 0 diff --git a/integration_tests/cases/env/cluster/ddl/create_tables.sql b/integration_tests/cases/env/cluster/ddl/create_tables.sql index 41e4209fd4..16abc907e2 100644 --- a/integration_tests/cases/env/cluster/ddl/create_tables.sql +++ b/integration_tests/cases/env/cluster/ddl/create_tables.sql @@ -58,15 +58,6 @@ CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) show create table `05_create_tables_t8`; drop table `05_create_tables_t8`; -CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'hybrid'); -show create table `05_create_tables_t8`; -drop table `05_create_tables_t8`; - --- Use Dictionary Encode -CREATE TABLE `05_create_tables_t9`(c1 int, d string dictionary, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'hybrid'); -show create table `05_create_tables_t9`; -drop table `05_create_tables_t9`; - CREATE TABLE `05_create_tables_t9`(c1 int, d string dictionary, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'columnar'); show create table `05_create_tables_t9`; drop table `05_create_tables_t9`; diff --git a/integration_tests/cases/env/local/ddl/create_tables.result b/integration_tests/cases/env/local/ddl/create_tables.result index f751a42dcf..640c3c4589 100644 --- a/integration_tests/cases/env/local/ddl/create_tables.result +++ b/integration_tests/cases/env/local/ddl/create_tables.result @@ -167,34 +167,6 @@ drop table `05_create_tables_t8`; affected_rows: 0 -CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'hybrid'); - -affected_rows: 0 - -show create table `05_create_tables_t8`; - -Table,Create Table, -String("05_create_tables_t8"),String("CREATE TABLE `05_create_tables_t8` (`tsid` uint64 NOT NULL, `t1` timestamp NOT NULL, `c1` int, PRIMARY KEY(tsid,t1), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='HYBRID', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"), - - -drop table `05_create_tables_t8`; - -affected_rows: 0 - -CREATE TABLE `05_create_tables_t9`(c1 int, d string dictionary, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'hybrid'); - -affected_rows: 0 - -show create table `05_create_tables_t9`; - -Table,Create Table, -String("05_create_tables_t9"),String("CREATE TABLE `05_create_tables_t9` (`tsid` uint64 NOT NULL, `t1` timestamp NOT NULL, `c1` int, `d` string DICTIONARY, PRIMARY KEY(tsid,t1), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='HYBRID', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"), - - -drop table `05_create_tables_t9`; - -affected_rows: 0 - CREATE TABLE `05_create_tables_t9`(c1 int, d string dictionary, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'columnar'); affected_rows: 0 diff --git a/integration_tests/cases/env/local/ddl/create_tables.sql b/integration_tests/cases/env/local/ddl/create_tables.sql index 41e4209fd4..16abc907e2 100644 --- a/integration_tests/cases/env/local/ddl/create_tables.sql +++ b/integration_tests/cases/env/local/ddl/create_tables.sql @@ -58,15 +58,6 @@ CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) show create table `05_create_tables_t8`; drop table `05_create_tables_t8`; -CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'hybrid'); -show create table `05_create_tables_t8`; -drop table `05_create_tables_t8`; - --- Use Dictionary Encode -CREATE TABLE `05_create_tables_t9`(c1 int, d string dictionary, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'hybrid'); -show create table `05_create_tables_t9`; -drop table `05_create_tables_t9`; - CREATE TABLE `05_create_tables_t9`(c1 int, d string dictionary, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'columnar'); show create table `05_create_tables_t9`; drop table `05_create_tables_t9`; diff --git a/tools/src/bin/sst-convert.rs b/tools/src/bin/sst-convert.rs index 08d7b556b6..8085917417 100644 --- a/tools/src/bin/sst-convert.rs +++ b/tools/src/bin/sst-convert.rs @@ -59,7 +59,7 @@ struct Args { #[clap(short, long, default_value_t = 8192)] batch_size: usize, - /// Storage format(values: columnar/hybrid) + /// Storage format(values: columnar) #[clap(short, long, default_value = "columnar")] output_format: String, } diff --git a/tools/src/sst_util.rs b/tools/src/sst_util.rs index 9d8fceda13..00b18c25c9 100644 --- a/tools/src/sst_util.rs +++ b/tools/src/sst_util.rs @@ -29,6 +29,6 @@ pub async fn meta_from_sst(store: &ObjectStoreRef, sst_path: &Path) -> MetaData .find(|kv| kv.key == encoding::META_KEY) .unwrap(); - let parquet_meta_data = encoding::decode_sst_meta_data_v1(kv_meta).unwrap(); + let parquet_meta_data = encoding::decode_sst_meta_data_from_kv(kv_meta).unwrap(); MetaData::from(parquet_meta_data) }