Skip to content

Commit 7796cb6

Browse files
authored
feat: avoid frequent write stall (apache#843)
* feat: avoid frequent write stall * feat: add option to control mutable limit * fix: allow preflush_write_buffer_size_ratio to equal 0
1 parent c0d54c3 commit 7796cb6

File tree

7 files changed

+93
-15
lines changed

7 files changed

+93
-15
lines changed

analytic_engine/src/instance/create.rs

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ impl Instance {
6060
request,
6161
table_opts,
6262
&self.file_purger,
63+
self.preflush_write_buffer_size_ratio,
6364
space.mem_usage_collector.clone(),
6465
)
6566
.context(CreateTableData {

analytic_engine/src/instance/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ pub struct Instance {
174174
pub(crate) db_write_buffer_size: usize,
175175
/// Space write buffer size
176176
pub(crate) space_write_buffer_size: usize,
177+
/// The ratio of table's write buffer size to trigger preflush
178+
pub(crate) preflush_write_buffer_size_ratio: f32,
177179
/// Replay wal batch size
178180
pub(crate) replay_batch_size: usize,
179181
/// Write sst max buffer size

analytic_engine/src/instance/open.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ impl Instance {
9797
mem_usage_collector: Arc::new(MemUsageCollector::default()),
9898
db_write_buffer_size: ctx.config.db_write_buffer_size,
9999
space_write_buffer_size: ctx.config.space_write_buffer_size,
100+
preflush_write_buffer_size_ratio: ctx.config.preflush_write_buffer_size_ratio,
100101
replay_batch_size: ctx.config.replay_batch_size,
101102
write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_byte() as usize,
102103
max_bytes_per_write_batch: ctx
@@ -237,8 +238,9 @@ impl Instance {
237238
TableData::recover_from_add(
238239
table_meta,
239240
&self.file_purger,
240-
space.mem_usage_collector.clone(),
241241
request.shard_id,
242+
self.preflush_write_buffer_size_ratio,
243+
space.mem_usage_collector.clone(),
242244
)
243245
.context(RecoverTableData {
244246
space_id: space.id,
@@ -384,7 +386,7 @@ impl Instance {
384386
})?;
385387

386388
// Flush the table if necessary.
387-
if table_data.should_flush_table() {
389+
if table_data.should_flush_table(serial_exec) {
388390
let opts = TableFlushOptions {
389391
res_sender: None,
390392
compact_after_flush: None,

analytic_engine/src/instance/serial_executor.rs

+5
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ impl TableOpSerialExecutor {
9191
}
9292

9393
impl TableFlushScheduler {
94+
pub fn is_in_flush(&self) -> bool {
95+
let state = self.schedule_sync.state.lock().unwrap();
96+
matches!(&*state, FlushState::Flushing)
97+
}
98+
9499
/// Control the flush procedure and ensure multiple flush procedures to be
95100
/// sequential.
96101
///

analytic_engine/src/instance/write.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ impl<'a> Writer<'a> {
535535
}
536536
}
537537

538-
if self.table_data.should_flush_table() {
538+
if self.table_data.should_flush_table(self.serial_exec) {
539539
let table_data = self.table_data.clone();
540540
let _timer = table_data.metrics.start_table_write_flush_wait_timer();
541541
self.handle_memtable_flush(&table_data).await?;

analytic_engine/src/lib.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,13 @@ pub struct Config {
6060
/// Manifest options
6161
pub manifest: ManifestOptions,
6262

63-
// Global write buffer options:
6463
/// The maximum write buffer size used for single space.
6564
pub space_write_buffer_size: usize,
6665
/// The maximum size of all Write Buffers across all spaces.
6766
pub db_write_buffer_size: usize,
68-
/// End of global write buffer options.
67+
/// The ratio of table's write buffer size to trigger preflush, and it
68+
/// should be in the range (0, 1].
69+
pub preflush_write_buffer_size_ratio: f32,
6970

7071
// Iterator scanning options
7172
/// Batch size for iterator.
@@ -112,6 +113,7 @@ impl Default for Config {
112113
/// Zero means disabling this param, give a positive value to enable
113114
/// it.
114115
db_write_buffer_size: 0,
116+
preflush_write_buffer_size_ratio: 0.75,
115117
scan_batch_size: None,
116118
sst_background_read_parallelism: 8,
117119
scan_max_record_batches_in_flight: 1024,

analytic_engine/src/table/data.rs

+76-10
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ pub struct TableData {
9999

100100
/// Mutable memtable memory size limitation
101101
mutable_limit: AtomicU32,
102+
/// Mutable memtable memory usage ratio of the write buffer size.
103+
mutable_limit_write_buffer_ratio: f32,
104+
102105
/// Options of this table
103106
///
104107
/// Most modification to `opts` can be done by replacing the old options
@@ -174,8 +177,18 @@ impl Drop for TableData {
174177
}
175178

176179
#[inline]
177-
fn get_mutable_limit(opts: &TableOptions) -> u32 {
178-
opts.write_buffer_size / 8 * 7
180+
fn compute_mutable_limit(
181+
write_buffer_size: u32,
182+
mutable_limit_write_buffer_size_ratio: f32,
183+
) -> u32 {
184+
assert!(
185+
mutable_limit_write_buffer_size_ratio >= 0.0
186+
&& mutable_limit_write_buffer_size_ratio <= 1.0
187+
);
188+
189+
let limit = write_buffer_size as f32 * mutable_limit_write_buffer_size_ratio;
190+
// This is safe because the limit won't be larger than the write_buffer_size.
191+
limit as u32
179192
}
180193

181194
impl TableData {
@@ -188,6 +201,7 @@ impl TableData {
188201
request: CreateTableRequest,
189202
table_opts: TableOptions,
190203
purger: &FilePurger,
204+
preflush_write_buffer_size_ratio: f32,
191205
mem_usage_collector: CollectorRef,
192206
) -> Result<Self> {
193207
// FIXME(yingwen): Validate TableOptions, such as bucket_duration >=
@@ -197,13 +211,18 @@ impl TableData {
197211
let purge_queue = purger.create_purge_queue(space_id, request.table_id);
198212
let current_version = TableVersion::new(purge_queue);
199213
let metrics = Metrics::default();
214+
let mutable_limit = AtomicU32::new(compute_mutable_limit(
215+
table_opts.write_buffer_size,
216+
preflush_write_buffer_size_ratio,
217+
));
200218

201219
Ok(Self {
202220
id: request.table_id,
203221
name: request.table_name,
204222
schema: Mutex::new(request.table_schema),
205223
space_id,
206-
mutable_limit: AtomicU32::new(get_mutable_limit(&table_opts)),
224+
mutable_limit,
225+
mutable_limit_write_buffer_ratio: preflush_write_buffer_size_ratio,
207226
opts: ArcSwap::new(Arc::new(table_opts)),
208227
memtable_factory,
209228
mem_usage_collector,
@@ -225,20 +244,26 @@ impl TableData {
225244
pub fn recover_from_add(
226245
add_meta: AddTableMeta,
227246
purger: &FilePurger,
228-
mem_usage_collector: CollectorRef,
229247
shard_id: ShardId,
248+
preflush_write_buffer_size_ratio: f32,
249+
mem_usage_collector: CollectorRef,
230250
) -> Result<Self> {
231251
let memtable_factory = Arc::new(SkiplistMemTableFactory);
232252
let purge_queue = purger.create_purge_queue(add_meta.space_id, add_meta.table_id);
233253
let current_version = TableVersion::new(purge_queue);
234254
let metrics = Metrics::default();
255+
let mutable_limit = AtomicU32::new(compute_mutable_limit(
256+
add_meta.opts.write_buffer_size,
257+
preflush_write_buffer_size_ratio,
258+
));
235259

236260
Ok(Self {
237261
id: add_meta.table_id,
238262
name: add_meta.table_name,
239263
schema: Mutex::new(add_meta.schema),
240264
space_id: add_meta.space_id,
241-
mutable_limit: AtomicU32::new(get_mutable_limit(&add_meta.opts)),
265+
mutable_limit,
266+
mutable_limit_write_buffer_ratio: preflush_write_buffer_size_ratio,
242267
opts: ArcSwap::new(Arc::new(add_meta.opts)),
243268
memtable_factory,
244269
mem_usage_collector,
@@ -307,8 +332,11 @@ impl TableData {
307332
/// Update table options.
308333
#[inline]
309334
pub fn set_table_options(&self, opts: TableOptions) {
310-
self.mutable_limit
311-
.store(get_mutable_limit(&opts), Ordering::Relaxed);
335+
let mutable_limit = compute_mutable_limit(
336+
opts.write_buffer_size,
337+
self.mutable_limit_write_buffer_ratio,
338+
);
339+
self.mutable_limit.store(mutable_limit, Ordering::Relaxed);
312340
self.opts.store(Arc::new(opts))
313341
}
314342

@@ -399,7 +427,7 @@ impl TableData {
399427
/// Returns true if the memory usage of this table reaches flush threshold
400428
///
401429
/// REQUIRE: Do in write worker
402-
pub fn should_flush_table(&self) -> bool {
430+
pub fn should_flush_table(&self, serial_exec: &mut TableOpSerialExecutor) -> bool {
403431
// Fallback to usize::MAX if Failed to convert arena_block_size into
404432
// usize (overflow)
405433
let max_write_buffer_size = self
@@ -416,8 +444,9 @@ impl TableData {
416444
let mutable_usage = self.current_version.mutable_memory_usage();
417445
let total_usage = self.current_version.total_memory_usage();
418446

447+
let in_flush = serial_exec.flush_scheduler().is_in_flush();
419448
// Inspired by https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L94
420-
if mutable_usage > mutable_limit {
449+
if mutable_usage > mutable_limit && !in_flush {
421450
info!(
422451
"TableData should flush, table:{}, table_id:{}, mutable_usage:{}, mutable_limit: {}, total_usage:{}, max_write_buffer_size:{}",
423452
self.name, self.id, mutable_usage, mutable_limit, total_usage, max_write_buffer_size
@@ -660,7 +689,15 @@ pub mod tests {
660689
let purger = FilePurgerMocker::mock();
661690
let collector = Arc::new(NoopCollector);
662691

663-
TableData::new(space_id, create_request, table_opts, &purger, collector).unwrap()
692+
TableData::new(
693+
space_id,
694+
create_request,
695+
table_opts,
696+
&purger,
697+
0.75,
698+
collector,
699+
)
700+
.unwrap()
664701
}
665702
}
666703

@@ -734,4 +771,33 @@ pub mod tests {
734771
TimeRange::bucket_of(now_ts, table_options::DEFAULT_SEGMENT_DURATION).unwrap();
735772
assert_eq!(time_range, mem_state.time_range);
736773
}
774+
775+
#[test]
776+
fn test_compute_mutable_limit() {
777+
// Build the cases for compute_mutable_limit.
778+
let cases = vec![
779+
(80, 0.8, 64),
780+
(80, 0.5, 40),
781+
(80, 0.1, 8),
782+
(80, 0.0, 0),
783+
(80, 1.0, 80),
784+
(0, 0.8, 0),
785+
(0, 0.5, 0),
786+
(0, 0.1, 0),
787+
(0, 0.0, 0),
788+
(0, 1.0, 0),
789+
];
790+
791+
for (write_buffer_size, ratio, expected) in cases {
792+
let limit = compute_mutable_limit(write_buffer_size, ratio);
793+
assert_eq!(expected, limit);
794+
}
795+
}
796+
797+
#[should_panic]
798+
#[test]
799+
fn test_compute_mutable_limit_panic() {
800+
compute_mutable_limit(80, 1.1);
801+
compute_mutable_limit(80, -0.1);
802+
}
737803
}

0 commit comments

Comments
 (0)