diff --git a/Cargo.lock b/Cargo.lock index 4047ab4ebe0fe..773f964bf8880 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1981,6 +1981,17 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" +[[package]] +name = "bitfield-struct" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2be5a46ba01b60005ae2c51a36a29cfe134bcacae2dd5cedcd4615fbaad1494b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -10590,6 +10601,7 @@ dependencies = [ "async-trait", "auto_enums", "auto_impl", + "bitfield-struct", "bitflags 2.8.0", "byteorder", "bytes", diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index a929dee577963..71d4ee11c1a64 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -22,6 +22,7 @@ arrow-53-schema = { package = "arrow-schema", version = "53" } async-trait = "0.1" auto_enums = { workspace = true } auto_impl = "1" +bitfield-struct = "0.10" bitflags = "2" byteorder = "1" bytes = "1" diff --git a/src/common/src/util/value_encoding/column_aware_row_encoding.rs b/src/common/src/util/value_encoding/column_aware_row_encoding.rs index 003e6332facf5..82f7dbbee3a2a 100644 --- a/src/common/src/util/value_encoding/column_aware_row_encoding.rs +++ b/src/common/src/util/value_encoding/column_aware_row_encoding.rs @@ -23,97 +23,149 @@ use std::collections::HashSet; use std::sync::Arc; use ahash::HashMap; -use bitflags::bitflags; +use bitfield_struct::bitfield; use super::*; use crate::catalog::ColumnId; -bitflags! { - #[derive(Debug, Clone, Copy, PartialEq, Eq)] - struct Flag: u8 { - const EMPTY = 0b_1000_0000; - const OFFSET8 = 0b01; - const OFFSET16 = 0b10; - const OFFSET32 = 0b11; +/// The width of the offset of the encoded data, i.e., how many bytes are used to represent the offset. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +enum OffsetWidth { + /// The offset of encoded data can be represented by u8. + Offset8 = 0b01, + /// The offset of encoded data can be represented by u16. + Offset16 = 0b10, + /// The offset of encoded data can be represented by u32. + Offset32 = 0b11, +} + +impl OffsetWidth { + /// Get the width of the offset in bytes. + const fn width(self) -> usize { + match self { + OffsetWidth::Offset8 => 1, + OffsetWidth::Offset16 => 2, + OffsetWidth::Offset32 => 4, + } } + + const fn into_bits(self) -> u8 { + self as u8 + } + + const fn from_bits(bits: u8) -> Self { + match bits { + 0b01 => OffsetWidth::Offset8, + 0b10 => OffsetWidth::Offset16, + 0b11 => OffsetWidth::Offset32, + _ => panic!("invalid offset width bits"), + } + } +} + +/// Header (metadata) of the encoded row. +/// +/// Layout (most to least significant bits): +/// +/// ```text +/// | magic | reserved | offset | +/// | 1 | 5 | 2 | +/// ``` +#[bitfield(u8, order = Msb)] +#[derive(PartialEq, Eq)] +struct Header { + /// Magic bit to indicate it's column-aware encoding. + /// Note that in plain value encoding, the first byte is always 0 or 1 for nullability, + /// of which the most significant bit is always 0. + #[bits(1, default = true, access = RO)] + magic: bool, + + #[bits(5)] + _reserved: u8, + + /// Indicate the offset width of the encoded data. + #[bits(2, default = OffsetWidth::Offset8)] + offset: OffsetWidth, } /// `RowEncoding` holds row-specific information for Column-Aware Encoding struct RowEncoding { - flag: Flag, + header: Header, offsets: Vec, buf: Vec, } +/// A trait unifying [`ToDatumRef`] and already encoded bytes. +trait Encode { + fn encode_to(self, data: &mut Vec); +} + +impl Encode for T +where + T: ToDatumRef, +{ + fn encode_to(self, data: &mut Vec) { + if let Some(v) = self.to_datum_ref() { + serialize_scalar(v, data); + } + } +} + +impl Encode for Option<&[u8]> { + fn encode_to(self, data: &mut Vec) { + if let Some(v) = self { + data.extend(v); + } + } +} + impl RowEncoding { fn new() -> Self { RowEncoding { - flag: Flag::EMPTY, + header: Header::new(), offsets: vec![], buf: vec![], } } - fn set_offsets(&mut self, usize_offsets: &[usize], max_offset: usize) { - debug_assert!(self.offsets.is_empty()); - match max_offset { - _n @ ..=const { u8::MAX as usize } => { - self.flag |= Flag::OFFSET8; - usize_offsets - .iter() - .for_each(|m| self.offsets.put_u8(*m as u8)); - } - _n @ ..=const { u16::MAX as usize } => { - self.flag |= Flag::OFFSET16; - usize_offsets - .iter() - .for_each(|m| self.offsets.put_u16_le(*m as u16)); - } - _n @ ..=const { u32::MAX as usize } => { - self.flag |= Flag::OFFSET32; - usize_offsets - .iter() - .for_each(|m| self.offsets.put_u32_le(*m as u32)); - } - _ => unreachable!("encoding length exceeds u32"), - } - } - - fn encode(&mut self, datum_refs: impl Iterator) { + fn set_offsets(&mut self, usize_offsets: &[usize]) { debug_assert!( - self.buf.is_empty(), - "should not encode one RowEncoding object multiple times." + self.offsets.is_empty(), + "should not set offsets multiple times" ); - let mut offset_usize = vec![]; - for datum in datum_refs { - offset_usize.push(self.buf.len()); - if let Some(v) = datum.to_datum_ref() { - serialize_scalar(v, &mut self.buf); - } + + // Use 0 if there's no data. + let max_offset = usize_offsets.last().copied().unwrap_or(0); + + let offset_width = match max_offset { + _n @ ..=const { u8::MAX as usize } => OffsetWidth::Offset8, + _n @ ..=const { u16::MAX as usize } => OffsetWidth::Offset16, + _n @ ..=const { u32::MAX as usize } => OffsetWidth::Offset32, + _ => panic!("encoding length {} exceeds u32", max_offset), + }; + self.header.set_offset(offset_width); + + self.offsets + .reserve_exact(usize_offsets.len() * offset_width.width()); + for &offset in usize_offsets { + self.offsets + .put_uint_le(offset as u64, offset_width.width()); } - let max_offset = *offset_usize - .last() - .expect("should encode at least one column"); - self.set_offsets(&offset_usize, max_offset); } - // TODO: Avoid duplicated code. `encode_slice` is the same as `encode` except it doesn't require column type. - fn encode_slice<'a>(&mut self, datum_refs: impl Iterator>) { + fn encode(&mut self, datums: impl IntoIterator) { debug_assert!( self.buf.is_empty(), "should not encode one RowEncoding object multiple times." ); - let mut offset_usize = vec![]; - for datum in datum_refs { + let datums = datums.into_iter(); + let mut offset_usize = Vec::with_capacity(datums.size_hint().0); + for datum in datums { offset_usize.push(self.buf.len()); - if let Some(v) = datum { - self.buf.put_slice(v); - } + datum.encode_to(&mut self.buf); } - let max_offset = *offset_usize - .last() - .expect("should encode at least one column"); - self.set_offsets(&offset_usize, max_offset); + self.set_offsets(&offset_usize); } } @@ -140,11 +192,17 @@ impl Serializer { } } - fn serialize_inner(&self, encoding: RowEncoding) -> Vec { + fn serialize_raw(&self, datums: impl IntoIterator) -> Vec { + let mut encoding = RowEncoding::new(); + encoding.encode(datums); + self.finish(encoding) + } + + fn finish(&self, encoding: RowEncoding) -> Vec { let mut row_bytes = Vec::with_capacity( 5 + self.encoded_column_ids.len() + encoding.offsets.len() + encoding.buf.len(), /* 5 comes from u8+u32 */ ); - row_bytes.put_u8(encoding.flag.bits()); + row_bytes.put_u8(encoding.header.into_bits()); row_bytes.put_u32_le(self.datum_num); row_bytes.extend(&self.encoded_column_ids); row_bytes.extend(&encoding.offsets); @@ -158,9 +216,70 @@ impl ValueRowSerializer for Serializer { /// Serialize a row under the schema of the Serializer fn serialize(&self, row: impl Row) -> Vec { assert_eq!(row.len(), self.datum_num as usize); - let mut encoding = RowEncoding::new(); - encoding.encode(row.iter()); - self.serialize_inner(encoding) + self.serialize_raw(row.iter()) + } +} + +/// A view of the encoded bytes, which can be iterated over to get the column id and data. +/// Used for deserialization. +// TODO: can we unify this with `RowEncoding`, which is for serialization? +#[derive(Clone)] +struct EncodedBytes<'a> { + header: Header, + + // When iterating, we will consume `column_ids` and `offsets` while keep `data` unchanged. + // This is because we record absolute values in `offsets` to index into `data`. + column_ids: &'a [u8], + offsets: &'a [u8], + data: &'a [u8], +} + +impl<'a> EncodedBytes<'a> { + fn new(mut encoded_bytes: &'a [u8]) -> Result { + let header = Header::from_bits(encoded_bytes.get_u8()); + if !header.magic() { + return Err(ValueEncodingError::InvalidFlag(header.into_bits())); + } + let offset_bytes = header.offset().width(); + + let datum_num = encoded_bytes.get_u32_le() as usize; + let offsets_start_idx = 4 * datum_num; + let data_start_idx = offsets_start_idx + datum_num * offset_bytes; + + Ok(EncodedBytes { + header, + column_ids: &encoded_bytes[..offsets_start_idx], + offsets: &encoded_bytes[offsets_start_idx..data_start_idx], + data: &encoded_bytes[data_start_idx..], + }) + } +} + +impl<'a> Iterator for EncodedBytes<'a> { + type Item = (i32, &'a [u8]); + + fn next(&mut self) -> Option { + if self.column_ids.is_empty() { + assert!(self.offsets.is_empty()); + return None; + } + + let id = self.column_ids.get_i32_le(); + + let offset_width = self.header.offset().width(); + let get_offset = |offsets: &mut &[u8]| offsets.get_uint_le(offset_width) as usize; + + let this_offset = get_offset(&mut self.offsets); + let next_offset = if self.offsets.is_empty() { + self.data.len() + } else { + let mut peek_offsets = self.offsets; // copy the reference to the slice to avoid mutating the buf position + get_offset(&mut peek_offsets) + }; + + let data = &self.data[this_offset..next_offset]; + + Some((id, data)) } } @@ -199,63 +318,26 @@ impl Deserializer { } impl ValueRowDeserializer for Deserializer { - fn deserialize(&self, mut encoded_bytes: &[u8]) -> Result> { - let flag = Flag::from_bits(encoded_bytes.get_u8()).expect("should be a valid flag"); - let offset_bytes = match flag - Flag::EMPTY { - Flag::OFFSET8 => 1, - Flag::OFFSET16 => 2, - Flag::OFFSET32 => 4, - _ => return Err(ValueEncodingError::InvalidFlag(flag.bits())), - }; - let datum_num = encoded_bytes.get_u32_le() as usize; - let offsets_start_idx = 4 * datum_num; - let data_start_idx = offsets_start_idx + datum_num * offset_bytes; - let offsets = &encoded_bytes[offsets_start_idx..data_start_idx]; - let data = &encoded_bytes[data_start_idx..]; + fn deserialize(&self, encoded_bytes: &[u8]) -> Result> { + let encoded_bytes = EncodedBytes::new(encoded_bytes)?; let mut row = self.default_row.clone(); - for i in 0..datum_num { - let this_id = encoded_bytes.get_i32_le(); - if let Some(&decoded_idx) = self.required_column_ids.get(&this_id) { - let this_offset_start_idx = i * offset_bytes; - let mut this_offset_slice = - &offsets[this_offset_start_idx..(this_offset_start_idx + offset_bytes)]; - let this_offset = deserialize_width(offset_bytes, &mut this_offset_slice); - let data = if i + 1 < datum_num { - let mut next_offset_slice = &offsets[(this_offset_start_idx + offset_bytes) - ..(this_offset_start_idx + 2 * offset_bytes)]; - let next_offset = deserialize_width(offset_bytes, &mut next_offset_slice); - if this_offset == next_offset { - None - } else { - let mut data_slice = &data[this_offset..next_offset]; - Some(deserialize_value( - &self.schema[decoded_idx], - &mut data_slice, - )?) - } - } else if this_offset == data.len() { - None - } else { - let mut data_slice = &data[this_offset..]; - Some(deserialize_value( - &self.schema[decoded_idx], - &mut data_slice, - )?) - }; - row[decoded_idx] = data; - } + + for (id, mut data) in encoded_bytes { + let Some(&decoded_idx) = self.required_column_ids.get(&id) else { + continue; + }; + + let datum = if data.is_empty() { + None + } else { + Some(deserialize_value(&self.schema[decoded_idx], &mut data)?) + }; + + row[decoded_idx] = datum; } - Ok(row) - } -} -fn deserialize_width(len: usize, data: &mut impl Buf) -> usize { - match len { - 1 => data.get_u8() as usize, - 2 => data.get_u16_le() as usize, - 4 => data.get_u32_le() as usize, - _ => unreachable!("Width's len should be either 1, 2, or 4"), + Ok(row) } } @@ -281,86 +363,29 @@ impl ValueRowDeserializer for ColumnAwareSerde { /// Deserializes row `encoded_bytes`, drops columns not in `valid_column_ids`, serializes and returns. /// If no column is dropped, returns None. -// TODO: Avoid duplicated code. The current code combines`Serializer` and `Deserializer` with unavailable parameter removed, e.g. `Deserializer::schema`. pub fn try_drop_invalid_columns( - mut encoded_bytes: &[u8], + encoded_bytes: &[u8], valid_column_ids: &HashSet, ) -> Option> { - let flag = Flag::from_bits(encoded_bytes.get_u8()).expect("should be a valid flag"); - let datum_num = encoded_bytes.get_u32_le() as usize; - let mut is_column_dropped = false; - let mut encoded_bytes_copy = encoded_bytes; - for _ in 0..datum_num { - let this_id = encoded_bytes_copy.get_i32_le(); - if !valid_column_ids.contains(&this_id) { - is_column_dropped = true; - break; - } - } - if !is_column_dropped { + let encoded_bytes = EncodedBytes::new(encoded_bytes).ok()?; + + let has_invalid_column = (encoded_bytes.clone()).any(|(id, _)| !valid_column_ids.contains(&id)); + if !has_invalid_column { return None; } // Slow path that drops columns. Should be rare. - let offset_bytes = match flag - Flag::EMPTY { - Flag::OFFSET8 => 1, - Flag::OFFSET16 => 2, - Flag::OFFSET32 => 4, - _ => panic!("invalid flag {}", flag.bits()), - }; - let offsets_start_idx = 4 * datum_num; - let data_start_idx = offsets_start_idx + datum_num * offset_bytes; - let offsets = &encoded_bytes[offsets_start_idx..data_start_idx]; - let data = &encoded_bytes[data_start_idx..]; - let mut datums: Vec> = Vec::with_capacity(valid_column_ids.len()); + + let mut datums = Vec::with_capacity(valid_column_ids.len()); let mut column_ids = Vec::with_capacity(valid_column_ids.len()); - for i in 0..datum_num { - let this_id = encoded_bytes.get_i32_le(); - if valid_column_ids.contains(&this_id) { - column_ids.push(this_id); - let this_offset_start_idx = i * offset_bytes; - let mut this_offset_slice = - &offsets[this_offset_start_idx..(this_offset_start_idx + offset_bytes)]; - let this_offset = deserialize_width(offset_bytes, &mut this_offset_slice); - let data = if i + 1 < datum_num { - let mut next_offset_slice = &offsets[(this_offset_start_idx + offset_bytes) - ..(this_offset_start_idx + 2 * offset_bytes)]; - let next_offset = deserialize_width(offset_bytes, &mut next_offset_slice); - if this_offset == next_offset { - None - } else { - let data_slice = &data[this_offset..next_offset]; - Some(data_slice) - } - } else if this_offset == data.len() { - None - } else { - let data_slice = &data[this_offset..]; - Some(data_slice) - }; - datums.push(data); - } - } - if column_ids.is_empty() { - // According to `RowEncoding::encode`, at least one column is required. - return None; - } - let mut encoding = RowEncoding::new(); - encoding.encode_slice(datums.into_iter()); - let mut encoded_column_ids = Vec::with_capacity(column_ids.len() * 4); - let datum_num = column_ids.len() as u32; - for id in column_ids { - encoded_column_ids.put_i32_le(id); + for (id, data) in encoded_bytes { + if valid_column_ids.contains(&id) { + column_ids.push(ColumnId::new(id)); + datums.push(if data.is_empty() { None } else { Some(data) }); + } } - let mut row_bytes = Vec::with_capacity( - 5 + encoded_column_ids.len() + encoding.offsets.len() + encoding.buf.len(), /* 5 comes from u8+u32 */ - ); - row_bytes.put_u8(encoding.flag.bits()); - row_bytes.put_u32_le(datum_num); - row_bytes.extend(&encoded_column_ids); - row_bytes.extend(&encoding.offsets); - row_bytes.extend(&encoding.buf); + let row_bytes = Serializer::new(&column_ids).serialize_raw(datums); Some(row_bytes) } diff --git a/src/common/src/util/value_encoding/error.rs b/src/common/src/util/value_encoding/error.rs index 3754bb3252df7..d70e9e817c4c0 100644 --- a/src/common/src/util/value_encoding/error.rs +++ b/src/common/src/util/value_encoding/error.rs @@ -42,6 +42,6 @@ pub enum ValueEncodingError { #[backtrace] crate::array::ArrayError, ), - #[error("Invalid flag: {0}")] + #[error("Invalid flag: {0:b}")] InvalidFlag(u8), } diff --git a/src/storage/src/row_serde/value_serde.rs b/src/storage/src/row_serde/value_serde.rs index c931588808458..1231ad7e1dc17 100644 --- a/src/storage/src/row_serde/value_serde.rs +++ b/src/storage/src/row_serde/value_serde.rs @@ -303,8 +303,10 @@ mod tests { vec![Some(Int16(5)), Some(Utf8("ABC".into()))] ); - // drop all columns is now allowed - assert!(try_drop_invalid_columns(&row_bytes, &HashSet::new()).is_none()); + // all columns are dropped + let row_bytes_all_dropped = try_drop_invalid_columns(&row_bytes, &HashSet::new()).unwrap(); + assert_eq!(row_bytes_all_dropped.len(), 5); // 1 byte flag + 4 bytes for length (0) + assert_eq!(&row_bytes_all_dropped[1..], [0, 0, 0, 0]); } #[test]