Skip to content

Commit 0c2dc8e

Browse files
authored
chore: reflush memory tables after flush failed (#891)
1 parent def0f96 commit 0c2dc8e

File tree

6 files changed

+80
-12
lines changed

6 files changed

+80
-12
lines changed

analytic_engine/src/instance/flush_compaction.rs

+15-5
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,16 @@ pub enum Error {
9696
WriteSst { path: String, source: GenericError },
9797

9898
#[snafu(display(
99-
"Background flush failed, cannot write more data, err:{}.\nBacktrace:\n{}",
99+
"Background flush failed, cannot write more data, retry_count:{}, err:{}.\nBacktrace:\n{}",
100+
retry_count,
100101
msg,
101102
backtrace
102103
))]
103-
BackgroundFlushFailed { msg: String, backtrace: Backtrace },
104+
BackgroundFlushFailed {
105+
msg: String,
106+
retry_count: usize,
107+
backtrace: Backtrace,
108+
},
104109

105110
#[snafu(display("Failed to build merge iterator, table:{}, err:{}", table, source))]
106111
BuildMergeIterator {
@@ -144,6 +149,10 @@ pub struct TableFlushOptions {
144149
///
145150
/// If it is [None], no compaction will be scheduled.
146151
pub compact_after_flush: Option<CompactionSchedulerRef>,
152+
/// Max retry limit After flush failed
153+
///
154+
/// Default is 0
155+
pub max_retry_flush_limit: usize,
147156
}
148157

149158
impl fmt::Debug for TableFlushOptions {
@@ -286,7 +295,7 @@ impl Flusher {
286295

287296
// TODO: The immediate compaction after flush is not a good idea because it may
288297
// block on the write procedure.
289-
if let Some(compaction_scheduler) = opts.compact_after_flush {
298+
if let Some(compaction_scheduler) = opts.compact_after_flush.clone() {
290299
// Schedule compaction if flush completed successfully.
291300
let compact_req = TableCompactionRequest::no_waiter(table_data.clone());
292301
let on_flush_success = async move {
@@ -301,7 +310,7 @@ impl Flusher {
301310
flush_job,
302311
on_flush_success,
303312
block_on,
304-
opts.res_sender,
313+
opts,
305314
&self.runtime,
306315
&table_data.metrics,
307316
)
@@ -312,7 +321,7 @@ impl Flusher {
312321
flush_job,
313322
async {},
314323
block_on,
315-
opts.res_sender,
324+
opts,
316325
&self.runtime,
317326
&table_data.metrics,
318327
)
@@ -430,6 +439,7 @@ impl FlushTask {
430439
meta_edit: MetaEdit::Update(meta_update),
431440
}
432441
};
442+
// Update manifest and remove immutable memtables
433443
self.space_store
434444
.manifest
435445
.apply_edit(edit_req)

analytic_engine/src/instance/mod.rs

+8
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ pub struct Instance {
152152
pub(crate) replay_batch_size: usize,
153153
/// Write sst max buffer size
154154
pub(crate) write_sst_max_buffer_size: usize,
155+
/// Max retry limit to flush memtables
156+
pub(crate) max_retry_flush_limit: usize,
155157
/// Max bytes per write batch
156158
pub(crate) max_bytes_per_write_batch: Option<usize>,
157159
/// Options for scanning sst
@@ -192,6 +194,7 @@ impl Instance {
192194
} else {
193195
None
194196
},
197+
max_retry_flush_limit: 0,
195198
};
196199

197200
let flusher = self.make_flusher();
@@ -275,6 +278,11 @@ impl Instance {
275278
write_sst_max_buffer_size: self.write_sst_max_buffer_size,
276279
}
277280
}
281+
282+
#[inline]
283+
fn max_retry_flush_limit(&self) -> usize {
284+
self.max_retry_flush_limit
285+
}
278286
}
279287

280288
/// Instance reference

analytic_engine/src/instance/open.rs

+2
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ impl Instance {
125125
space_write_buffer_size: ctx.config.space_write_buffer_size,
126126
replay_batch_size: ctx.config.replay_batch_size,
127127
write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_byte() as usize,
128+
max_retry_flush_limit: ctx.config.max_retry_flush_limit,
128129
max_bytes_per_write_batch: ctx
129130
.config
130131
.max_bytes_per_write_batch
@@ -326,6 +327,7 @@ impl Instance {
326327
let opts = TableFlushOptions {
327328
res_sender: None,
328329
compact_after_flush: None,
330+
max_retry_flush_limit: self.max_retry_flush_limit,
329331
};
330332
let flusher = self.make_flusher();
331333
let flush_scheduler = serial_exec.flush_scheduler();

analytic_engine/src/instance/serial_executor.rs

+47-6
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
use std::{
4-
sync::{Arc, Mutex},
4+
sync::{
5+
atomic::{AtomicUsize, Ordering},
6+
Arc, Mutex,
7+
},
58
time::Instant,
69
};
710

811
use common_util::{runtime::Runtime, time::InstantExt};
912
use futures::Future;
10-
use log::error;
13+
use log::{error, warn};
1114
use table_engine::table::TableId;
1215
use tokio::sync::{
1316
oneshot,
1417
watch::{self, Receiver, Sender},
1518
};
1619

20+
use super::flush_compaction::{BackgroundFlushFailed, TableFlushOptions};
1721
use crate::{
18-
instance::flush_compaction::{BackgroundFlushFailed, Other, Result},
22+
instance::flush_compaction::{Other, Result},
1923
table::metrics::Metrics,
2024
};
2125

@@ -34,6 +38,26 @@ type ScheduleSyncRef = Arc<ScheduleSync>;
3438
struct ScheduleSync {
3539
state: Mutex<FlushState>,
3640
notifier: Sender<()>,
41+
continuous_flush_failure_count: AtomicUsize,
42+
}
43+
44+
impl ScheduleSync {
45+
#[inline]
46+
pub fn should_retry_flush(&self, max_retry_limit: usize) -> bool {
47+
self.continuous_flush_failure_count.load(Ordering::Relaxed) < max_retry_limit
48+
}
49+
50+
#[inline]
51+
pub fn reset_flush_failure_count(&self) {
52+
self.continuous_flush_failure_count
53+
.store(0, Ordering::Relaxed);
54+
}
55+
56+
#[inline]
57+
pub fn inc_flush_failure_count(&self) {
58+
self.continuous_flush_failure_count
59+
.fetch_add(1, Ordering::Relaxed);
60+
}
3761
}
3862

3963
pub struct TableFlushScheduler {
@@ -47,6 +71,7 @@ impl Default for TableFlushScheduler {
4771
let schedule_sync = ScheduleSync {
4872
state: Mutex::new(FlushState::Ready),
4973
notifier: tx,
74+
continuous_flush_failure_count: AtomicUsize::new(0),
5075
};
5176
Self {
5277
schedule_sync: Arc::new(schedule_sync),
@@ -105,7 +130,7 @@ impl TableFlushScheduler {
105130
flush_job: F,
106131
on_flush_success: T,
107132
block_on_write_thread: bool,
108-
res_sender: Option<oneshot::Sender<Result<()>>>,
133+
opts: TableFlushOptions,
109134
runtime: &Runtime,
110135
metrics: &Metrics,
111136
) -> Result<()>
@@ -131,7 +156,21 @@ impl TableFlushScheduler {
131156
}
132157
FlushState::Flushing => (),
133158
FlushState::Failed { err_msg } => {
134-
return BackgroundFlushFailed { msg: err_msg }.fail();
159+
if self
160+
.schedule_sync
161+
.should_retry_flush(opts.max_retry_flush_limit)
162+
{
163+
warn!("Re-flush memory tables after background flush failed:{err_msg}");
164+
// Mark the worker is flushing.
165+
*flush_state = FlushState::Flushing;
166+
break;
167+
} else {
168+
return BackgroundFlushFailed {
169+
msg: err_msg,
170+
retry_count: opts.max_retry_flush_limit,
171+
}
172+
.fail();
173+
}
135174
}
136175
}
137176

@@ -164,7 +203,7 @@ impl TableFlushScheduler {
164203
if flush_res.is_ok() {
165204
on_flush_success.await;
166205
}
167-
send_flush_result(res_sender, flush_res);
206+
send_flush_result(opts.res_sender, flush_res);
168207
};
169208

170209
if block_on_write_thread {
@@ -182,9 +221,11 @@ fn on_flush_finished(schedule_sync: ScheduleSyncRef, res: &Result<()>) {
182221
let mut flush_state = schedule_sync.state.lock().unwrap();
183222
match res {
184223
Ok(()) => {
224+
schedule_sync.reset_flush_failure_count();
185225
*flush_state = FlushState::Ready;
186226
}
187227
Err(e) => {
228+
schedule_sync.inc_flush_failure_count();
188229
let err_msg = e.to_string();
189230
*flush_state = FlushState::Failed { err_msg };
190231
}

analytic_engine/src/instance/write.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,11 @@ impl<'a> Writer<'a> {
590590
/// acquired in advance. And in order to avoid deadlock, we should not wait
591591
/// for the lock.
592592
async fn handle_memtable_flush(&mut self, table_data: &TableDataRef) -> Result<()> {
593-
let opts = TableFlushOptions::default();
593+
let opts = TableFlushOptions {
594+
res_sender: None,
595+
compact_after_flush: None,
596+
max_retry_flush_limit: self.instance.max_retry_flush_limit(),
597+
};
594598
let flusher = self.instance.make_flusher();
595599
if table_data.id == self.table_data.id {
596600
let flush_scheduler = self.serial_exec.flush_scheduler();

analytic_engine/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ pub struct Config {
8383
pub sst_background_read_parallelism: usize,
8484
/// Max buffer size for writing sst
8585
pub write_sst_max_buffer_size: ReadableSize,
86+
/// Max retry limit After flush failed
87+
pub max_retry_flush_limit: usize,
8688
/// Max bytes per write batch.
8789
///
8890
/// If this is set, the atomicity of write request will be broken.
@@ -122,6 +124,7 @@ impl Default for Config {
122124
sst_background_read_parallelism: 8,
123125
scan_max_record_batches_in_flight: 1024,
124126
write_sst_max_buffer_size: ReadableSize::mb(10),
127+
max_retry_flush_limit: 0,
125128
max_bytes_per_write_batch: None,
126129
wal: WalStorageConfig::RocksDB(Box::default()),
127130
remote_engine_client: remote_engine_client::config::Config::default(),

0 commit comments

Comments
 (0)