Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: memtable bitmap for null #1046

Merged
merged 7 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions analytic_engine/src/memtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ pub enum Error {
source: common_types::record_batch::Error,
},

#[snafu(display("Failed to decode continuous row, err:{}", source))]
DecodeContinuousRow {
source: common_types::row::contiguous::Error,
},

#[snafu(display("Failed to project memtable schema, err:{}", source))]
ProjectSchema {
source: common_types::projected_schema::Error,
Expand Down
9 changes: 5 additions & 4 deletions analytic_engine/src/memtable/skiplist/iter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Skiplist memtable iterator

Expand All @@ -21,8 +21,8 @@ use snafu::ResultExt;
use crate::memtable::{
key::{self, KeySequence},
skiplist::{BytewiseComparator, SkiplistMemTable},
AppendRow, BuildRecordBatch, DecodeInternalKey, EncodeInternalKey, IterReverse, IterTimeout,
ProjectSchema, Result, ScanContext, ScanRequest,
AppendRow, BuildRecordBatch, DecodeContinuousRow, DecodeInternalKey, EncodeInternalKey,
IterReverse, IterTimeout, ProjectSchema, Result, ScanContext, ScanRequest,
};

/// Iterator state
Expand Down Expand Up @@ -147,7 +147,8 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
let mut num_rows = 0;
while self.iter.valid() && num_rows < self.batch_size {
if let Some(row) = self.fetch_next_row()? {
let row_reader = ContiguousRowReader::with_schema(&row, &self.memtable_schema);
let row_reader = ContiguousRowReader::try_new(&row, &self.memtable_schema)
.context(DecodeContinuousRow)?;
let projected_row = ProjectedContiguousRow::new(row_reader, &self.projector);

trace!("Column iterator fetch next row, row:{:?}", projected_row);
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/row_iter/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

use async_trait::async_trait;
use common_types::{
Expand Down Expand Up @@ -71,7 +71,7 @@ pub fn build_record_batch_with_key(schema: Schema, rows: Vec<Row>) -> RecordBatc

writer.write_row(&row).unwrap();

let source_row = ContiguousRowReader::with_schema(&buf, &schema);
let source_row = ContiguousRowReader::try_new(&buf, &schema).unwrap();
let projected_row = ProjectedContiguousRow::new(source_row, &row_projected_schema);
builder
.append_projected_contiguous_row(&projected_row)
Expand Down
123 changes: 123 additions & 0 deletions common_types/src/bitset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! BitSet supports counting set/unset bits.

#[derive(Debug, Default, Clone)]
pub struct BitSet {
/// The bits are stored as bytes in the least significant bit order.
buffer: Vec<u8>,
/// The number of real bits in the `buffer`
num_bits: usize,
}

impl BitSet {
/// Initialize a unset [`BitSet`].
pub fn new(num_bits: usize) -> Self {
Self {
buffer: vec![0; Self::num_bytes(num_bits)],
num_bits,
}
}

#[inline]
pub fn num_bits(&self) -> usize {
self.num_bits
}

#[inline]
pub fn num_bytes(num_bits: usize) -> usize {
(num_bits + 7) >> 3
}

/// Initialize directly from a buffer.
///
/// None will be returned if the buffer's length is not enough to cover the
/// bits of `num_bits`.
pub fn try_from_raw(buffer: Vec<u8>, num_bits: usize) -> Option<Self> {
if buffer.len() < Self::num_bytes(num_bits) {
None
} else {
Some(Self { buffer, num_bits })
}
}

/// Set the bit at the `index`.
///
/// Return false if the index is outside the range.
pub fn set(&mut self, index: usize) -> bool {
if index >= self.num_bits {
return false;
}
let (byte_index, bit_index) = Self::compute_byte_bit_index(index);
self.buffer[byte_index] |= 1 << bit_index;
true
}

/// Tells whether the bit at the `index` is set.
pub fn is_set(&self, index: usize) -> Option<bool> {
if index >= self.num_bits {
return None;
}
let (byte_index, bit_index) = Self::compute_byte_bit_index(index);
let set = (self.buffer[byte_index] & (1 << bit_index)) != 0;
Some(set)
}

#[inline]
pub fn as_bytes(&self) -> &[u8] {
&self.buffer
}

pub fn into_bytes(self) -> Vec<u8> {
self.buffer
}

#[inline]
fn compute_byte_bit_index(index: usize) -> (usize, usize) {
(index >> 3, index & 7)
}
}

#[cfg(test)]
mod tests {
use std::assert_eq;

use super::BitSet;

#[test]
fn test_set_op() {
let mut bit_set = BitSet::new(50);

assert!(bit_set.set(1));
assert!(bit_set.is_set(1).unwrap());

assert!(bit_set.set(20));
assert!(bit_set.is_set(20).unwrap());
assert!(bit_set.set(49));
assert!(bit_set.is_set(49).unwrap());

assert!(!bit_set.set(100));
assert!(bit_set.is_set(100).is_none());

assert_eq!(
bit_set.into_bytes(),
vec![
0b00000010,
0b00000000,
0b00010000,
0b000000000,
0b00000000,
0b00000000,
0b00000010
]
);
}

#[test]
fn test_try_from_raw() {
let raw_bytes: Vec<u8> = vec![0b11111111, 0b11110000, 0b00001111, 0b00001100, 0b00001001];
assert!(BitSet::try_from_raw(raw_bytes.clone(), 50).is_none());
assert!(BitSet::try_from_raw(raw_bytes.clone(), 40).is_some());
assert!(BitSet::try_from_raw(raw_bytes, 1).is_some());
}
}
21 changes: 21 additions & 0 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,27 @@ impl Datum {
Ok(Datum::Date(days))
}

pub fn is_fixed_sized(&self) -> bool {
match self {
Datum::Null
| Datum::Timestamp(_)
| Datum::Double(_)
| Datum::Float(_)
| Datum::UInt64(_)
| Datum::UInt32(_)
| Datum::UInt16(_)
| Datum::UInt8(_)
| Datum::Int64(_)
| Datum::Int32(_)
| Datum::Int16(_)
| Datum::Int8(_)
| Datum::Boolean(_)
| Datum::Date(_)
| Datum::Time(_) => true,
Datum::Varbinary(_) | Datum::String(_) => false,
}
}

pub fn size(&self) -> usize {
match self {
Datum::Null => 1,
Expand Down
1 change: 1 addition & 0 deletions common_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

//! Contains common types

pub mod bitset;
pub mod bytes;
#[cfg(feature = "arrow")]
pub mod column;
Expand Down
Loading