Skip to content

Commit a2cae2b

Browse files
authored
feat: support split write request to batches for small wal logs (apache#754)
* feat: support split write request to batches for small wal logs * test: add unit test for the split write requests * fix: rename the config option name * chore: add some comments * chore: explain that `max_bytes_per_write_batch` will break atomicity * refactor: do maybe_split_write_request
1 parent 13aaabd commit a2cae2b

File tree

6 files changed

+364
-12
lines changed

6 files changed

+364
-12
lines changed

analytic_engine/src/instance/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ pub struct Instance {
172172
pub(crate) replay_batch_size: usize,
173173
/// Write sst max buffer size
174174
pub(crate) write_sst_max_buffer_size: usize,
175+
/// Max bytes per write batch
176+
pub(crate) max_bytes_per_write_batch: Option<usize>,
175177
/// Options for scanning sst
176178
pub(crate) iter_options: IterOptions,
177179
pub(crate) remote_engine: Option<RemoteEngineRef>,

analytic_engine/src/instance/open.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ impl Instance {
9494
space_write_buffer_size: ctx.config.space_write_buffer_size,
9595
replay_batch_size: ctx.config.replay_batch_size,
9696
write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_bytes() as usize,
97+
max_bytes_per_write_batch: ctx
98+
.config
99+
.max_bytes_per_write_batch
100+
.map(|v| v.as_bytes() as usize),
97101
iter_options,
98102
remote_engine: remote_engine_ref,
99103
});
@@ -414,7 +418,7 @@ impl Instance {
414418
worker_local,
415419
table_data,
416420
sequence,
417-
row_group,
421+
&row_group.into(),
418422
index_in_writer,
419423
)
420424
.context(ApplyMemTable {

analytic_engine/src/instance/write.rs

+298-5
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::sync::Arc;
77
use ceresdbproto::{schema as schema_pb, table_requests};
88
use common_types::{
99
bytes::ByteVec,
10-
row::RowGroup,
10+
row::{RowGroup, RowGroupSlicer},
1111
schema::{IndexInWriterSchema, Schema},
1212
};
1313
use common_util::{codec::row, define_result};
@@ -150,6 +150,107 @@ impl EncodeContext {
150150
}
151151
}
152152

153+
/// Split the write request into multiple batches whose size is determined by
154+
/// the `max_bytes_per_batch`.
155+
struct WriteRowGroupSplitter {
156+
/// Max bytes per batch. Actually, the size of a batch is not exactly
157+
/// ensured less than this `max_bytes_per_batch`, but it is guaranteed that
158+
/// the batch contains at most one more row when its size exceeds this
159+
/// `max_bytes_per_batch`.
160+
max_bytes_per_batch: usize,
161+
}
162+
163+
enum SplitResult<'a> {
164+
Splitted {
165+
encoded_batches: Vec<Vec<ByteVec>>,
166+
row_group_batches: Vec<RowGroupSlicer<'a>>,
167+
},
168+
Integrate {
169+
encoded_rows: Vec<ByteVec>,
170+
row_group: RowGroupSlicer<'a>,
171+
},
172+
}
173+
174+
impl WriteRowGroupSplitter {
175+
pub fn new(max_bytes_per_batch: usize) -> Self {
176+
Self {
177+
max_bytes_per_batch,
178+
}
179+
}
180+
181+
/// Split the write request into multiple batches.
182+
///
183+
/// NOTE: The length of the `encoded_rows` should be the same as the number
184+
/// of rows in the `row_group`.
185+
pub fn split<'a>(
186+
&'_ self,
187+
encoded_rows: Vec<ByteVec>,
188+
row_group: &'a RowGroup,
189+
) -> SplitResult<'a> {
190+
let end_row_indexes = self.compute_batches(&encoded_rows);
191+
if end_row_indexes.len() <= 1 {
192+
// No need to split.
193+
return SplitResult::Integrate {
194+
encoded_rows,
195+
row_group: RowGroupSlicer::from(row_group),
196+
};
197+
}
198+
199+
let mut prev_end_row_index = 0;
200+
let mut encoded_batches = Vec::with_capacity(end_row_indexes.len());
201+
let mut row_group_batches = Vec::with_capacity(end_row_indexes.len());
202+
for end_row_index in &end_row_indexes {
203+
let end_row_index = *end_row_index;
204+
let curr_batch = Vec::with_capacity(end_row_index - prev_end_row_index);
205+
encoded_batches.push(curr_batch);
206+
let row_group_slicer =
207+
RowGroupSlicer::new(prev_end_row_index..end_row_index, row_group);
208+
row_group_batches.push(row_group_slicer);
209+
210+
prev_end_row_index = end_row_index;
211+
}
212+
213+
let mut current_batch_idx = 0;
214+
for (row_idx, encoded_row) in encoded_rows.into_iter().enumerate() {
215+
if row_idx >= end_row_indexes[current_batch_idx] {
216+
current_batch_idx += 1;
217+
}
218+
encoded_batches[current_batch_idx].push(encoded_row);
219+
}
220+
221+
SplitResult::Splitted {
222+
encoded_batches,
223+
row_group_batches,
224+
}
225+
}
226+
227+
/// Compute the end row indexes in the original `encoded_rows` of each
228+
/// batch.
229+
fn compute_batches(&self, encoded_rows: &[ByteVec]) -> Vec<usize> {
230+
let mut current_batch_size = 0;
231+
let mut end_row_indexes = Vec::new();
232+
for (row_idx, encoded_row) in encoded_rows.iter().enumerate() {
233+
let row_size = encoded_row.len();
234+
current_batch_size += row_size;
235+
236+
// If the current batch size exceeds the `max_bytes_per_batch`, freeze this
237+
// batch by recording its end row index.
238+
// Note that such check may cause the batch size exceeds the
239+
// `max_bytes_per_batch`.
240+
if current_batch_size >= self.max_bytes_per_batch {
241+
current_batch_size = 0;
242+
end_row_indexes.push(row_idx + 1)
243+
}
244+
}
245+
246+
if current_batch_size > 0 {
247+
end_row_indexes.push(encoded_rows.len());
248+
}
249+
250+
end_row_indexes
251+
}
252+
}
253+
153254
impl Instance {
154255
/// Write data to the table under give space.
155256
pub async fn write_to_table(
@@ -206,6 +307,65 @@ impl Instance {
206307
encoded_rows,
207308
} = encode_ctx;
208309

310+
match self.maybe_split_write_request(encoded_rows, &row_group) {
311+
SplitResult::Integrate {
312+
encoded_rows,
313+
row_group,
314+
} => {
315+
self.write_table_row_group(
316+
worker_local,
317+
table_data,
318+
row_group,
319+
index_in_writer,
320+
encoded_rows,
321+
)
322+
.await?;
323+
}
324+
SplitResult::Splitted {
325+
encoded_batches,
326+
row_group_batches,
327+
} => {
328+
for (encoded_rows, row_group) in encoded_batches.into_iter().zip(row_group_batches)
329+
{
330+
self.write_table_row_group(
331+
worker_local,
332+
table_data,
333+
row_group,
334+
index_in_writer.clone(),
335+
encoded_rows,
336+
)
337+
.await?;
338+
}
339+
}
340+
}
341+
342+
Ok(row_group.num_rows())
343+
}
344+
345+
fn maybe_split_write_request(
346+
self: &Arc<Self>,
347+
encoded_rows: Vec<ByteVec>,
348+
row_group: &RowGroup,
349+
) -> SplitResult {
350+
if self.max_bytes_per_write_batch.is_none() {
351+
return SplitResult::Integrate {
352+
encoded_rows,
353+
row_group: RowGroupSlicer::from(row_group),
354+
};
355+
}
356+
357+
let splitter = WriteRowGroupSplitter::new(self.max_bytes_per_write_batch.unwrap());
358+
splitter.split(encoded_rows, row_group)
359+
}
360+
361+
async fn write_table_row_group(
362+
self: &Arc<Self>,
363+
worker_local: &mut WorkerLocal,
364+
table_data: &TableDataRef,
365+
row_group: RowGroupSlicer<'_>,
366+
index_in_writer: IndexInWriterSchema,
367+
encoded_rows: Vec<ByteVec>,
368+
) -> Result<()> {
209369
let sequence = self
210370
.write_to_wal(worker_local, table_data, encoded_rows)
211371
.await?;
@@ -242,11 +402,12 @@ impl Instance {
242402

243403
table_data.set_last_sequence(sequence);
244404

245-
let num_rows = row_group.num_rows();
246405
// Collect metrics.
247-
table_data.metrics.on_write_request_done(num_rows);
406+
table_data
407+
.metrics
408+
.on_write_request_done(row_group.num_rows());
248409

249-
Ok(num_rows)
410+
Ok(())
250411
}
251412

252413
/// Return Ok if the request is valid, this is done before entering the
@@ -410,7 +571,7 @@ impl Instance {
410571
worker_local: &WorkerLocal,
411572
table_data: &TableDataRef,
412573
sequence: SequenceNumber,
413-
row_group: &RowGroup,
574+
row_group: &RowGroupSlicer,
414575
index_in_writer: IndexInWriterSchema,
415576
) -> Result<()> {
416577
if row_group.is_empty() {
@@ -490,3 +651,135 @@ impl Instance {
490651
})
491652
}
492653
}
654+
655+
#[cfg(test)]
656+
mod tests {
657+
use common_types::{
658+
column_schema::Builder as ColumnSchemaBuilder,
659+
datum::{Datum, DatumKind},
660+
row::{Row, RowGroupBuilder},
661+
schema::Builder as SchemaBuilder,
662+
time::Timestamp,
663+
};
664+
665+
use super::*;
666+
667+
fn generate_rows_for_test(sizes: Vec<usize>) -> (Vec<ByteVec>, RowGroup) {
668+
let encoded_rows: Vec<_> = sizes.iter().map(|size| vec![0; *size]).collect();
669+
let rows: Vec<_> = sizes
670+
.iter()
671+
.map(|size| {
672+
let datum = Datum::Timestamp(Timestamp::new(*size as i64));
673+
Row::from_datums(vec![datum])
674+
})
675+
.collect();
676+
677+
let column_schema = ColumnSchemaBuilder::new("ts".to_string(), DatumKind::Timestamp)
678+
.build()
679+
.unwrap();
680+
let schema = SchemaBuilder::new()
681+
.add_key_column(column_schema)
682+
.unwrap()
683+
.build()
684+
.unwrap();
685+
let row_group = RowGroupBuilder::with_rows(schema, rows).unwrap().build();
686+
687+
(encoded_rows, row_group)
688+
}
689+
690+
#[test]
691+
fn test_write_split_compute_batches() {
692+
let cases = vec![
693+
(2, vec![1, 2, 3, 4, 5], vec![2, 3, 4, 5]),
694+
(100, vec![50, 50, 100, 10], vec![2, 3, 4]),
695+
(1000, vec![50, 50, 100, 10], vec![4]),
696+
(2, vec![10, 10, 0, 10], vec![1, 2, 4]),
697+
(0, vec![10, 10, 0, 10], vec![1, 2, 3, 4]),
698+
(0, vec![0, 0], vec![1, 2]),
699+
(10, vec![], vec![]),
700+
];
701+
for (batch_size, sizes, expected_batch_indexes) in cases {
702+
let (encoded_rows, _) = generate_rows_for_test(sizes);
703+
let write_row_group_splitter = WriteRowGroupSplitter::new(batch_size);
704+
let batch_indexes = write_row_group_splitter.compute_batches(&encoded_rows);
705+
assert_eq!(batch_indexes, expected_batch_indexes);
706+
}
707+
}
708+
709+
#[test]
710+
fn test_write_split_row_group() {
711+
let cases = vec![
712+
(
713+
2,
714+
vec![1, 2, 3, 4, 5],
715+
vec![vec![1, 2], vec![3], vec![4], vec![5]],
716+
),
717+
(
718+
100,
719+
vec![50, 50, 100, 10],
720+
vec![vec![50, 50], vec![100], vec![10]],
721+
),
722+
(1000, vec![50, 50, 100, 10], vec![vec![50, 50, 100, 10]]),
723+
(
724+
2,
725+
vec![10, 10, 0, 10],
726+
vec![vec![10], vec![10], vec![0, 10]],
727+
),
728+
(
729+
0,
730+
vec![10, 10, 0, 10],
731+
vec![vec![10], vec![10], vec![0], vec![10]],
732+
),
733+
(0, vec![0, 0], vec![vec![0], vec![0]]),
734+
(10, vec![], vec![]),
735+
];
736+
737+
let check_encoded_rows = |encoded_rows: &[ByteVec], expected_row_sizes: &[usize]| {
738+
assert_eq!(encoded_rows.len(), expected_row_sizes.len());
739+
for (encoded_row, expected_row_size) in
740+
encoded_rows.iter().zip(expected_row_sizes.iter())
741+
{
742+
assert_eq!(encoded_row.len(), *expected_row_size);
743+
}
744+
};
745+
for (batch_size, sizes, expected_batches) in cases {
746+
let (encoded_rows, row_group) = generate_rows_for_test(sizes.clone());
747+
let write_row_group_splitter = WriteRowGroupSplitter::new(batch_size);
748+
let split_res = write_row_group_splitter.split(encoded_rows, &row_group);
749+
if expected_batches.is_empty() {
750+
assert!(matches!(split_res, SplitResult::Integrate { .. }));
751+
} else if expected_batches.len() == 1 {
752+
assert!(matches!(split_res, SplitResult::Integrate { .. }));
753+
if let SplitResult::Integrate {
754+
encoded_rows,
755+
row_group,
756+
} = split_res
757+
{
758+
check_encoded_rows(&encoded_rows, &expected_batches[0]);
759+
assert_eq!(row_group.num_rows(), expected_batches[0].len());
760+
}
761+
} else {
762+
assert!(matches!(split_res, SplitResult::Splitted { .. }));
763+
if let SplitResult::Splitted {
764+
encoded_batches,
765+
row_group_batches,
766+
} = split_res
767+
{
768+
assert_eq!(encoded_batches.len(), row_group_batches.len());
769+
assert_eq!(encoded_batches.len(), expected_batches.len());
770+
let mut batch_start_index = 0;
771+
for ((encoded_batch, row_group_batch), expected_batch) in encoded_batches
772+
.iter()
773+
.zip(row_group_batches.iter())
774+
.zip(expected_batches.iter())
775+
{
776+
check_encoded_rows(encoded_batch, expected_batch);
777+
assert_eq!(row_group_batch.num_rows(), expected_batch.len());
778+
assert_eq!(row_group_batch.slice_range().start, batch_start_index);
779+
batch_start_index += expected_batch.len();
780+
}
781+
}
782+
}
783+
}
784+
}
785+
}

analytic_engine/src/lib.rs

+5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ pub struct Config {
7777
pub sst_background_read_parallelism: usize,
7878
/// Max buffer size for writing sst
7979
pub write_sst_max_buffer_size: ReadableSize,
80+
/// Max bytes per write batch.
81+
///
82+
/// If this is set, the atomicity of write request will be broken.
83+
pub max_bytes_per_write_batch: Option<ReadableSize>,
8084

8185
/// Wal storage config
8286
///
@@ -111,6 +115,7 @@ impl Default for Config {
111115
scan_batch_size: 500,
112116
sst_background_read_parallelism: 8,
113117
write_sst_max_buffer_size: ReadableSize::mb(10),
118+
max_bytes_per_write_batch: None,
114119
wal: WalStorageConfig::RocksDB(Box::default()),
115120
remote_engine_client: remote_engine_client::config::Config::default(),
116121
}

0 commit comments

Comments
 (0)