Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: reflush memory tables after flush failed #891

Merged
merged 8 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,16 @@ pub enum Error {
WriteSst { path: String, source: GenericError },

#[snafu(display(
"Background flush failed, cannot write more data, err:{}.\nBacktrace:\n{}",
"Background flush failed, cannot write more data, retry_count:{}, err:{}.\nBacktrace:\n{}",
retry_count,
msg,
backtrace
))]
BackgroundFlushFailed { msg: String, backtrace: Backtrace },
BackgroundFlushFailed {
msg: String,
retry_count: usize,
backtrace: Backtrace,
},

#[snafu(display("Failed to build merge iterator, table:{}, err:{}", table, source))]
BuildMergeIterator {
Expand Down Expand Up @@ -144,6 +149,10 @@ pub struct TableFlushOptions {
///
/// If it is [None], no compaction will be scheduled.
pub compact_after_flush: Option<CompactionSchedulerRef>,
/// Max retry limit After flush failed
///
/// Default is 0
pub max_retry_flush_limit: usize,
}

impl fmt::Debug for TableFlushOptions {
Expand Down Expand Up @@ -286,7 +295,7 @@ impl Flusher {

// TODO: The immediate compaction after flush is not a good idea because it may
// block on the write procedure.
if let Some(compaction_scheduler) = opts.compact_after_flush {
if let Some(compaction_scheduler) = opts.compact_after_flush.clone() {
// Schedule compaction if flush completed successfully.
let compact_req = TableCompactionRequest::no_waiter(table_data.clone());
let on_flush_success = async move {
Expand All @@ -301,7 +310,7 @@ impl Flusher {
flush_job,
on_flush_success,
block_on,
opts.res_sender,
opts,
&self.runtime,
&table_data.metrics,
)
Expand All @@ -312,7 +321,7 @@ impl Flusher {
flush_job,
async {},
block_on,
opts.res_sender,
opts,
&self.runtime,
&table_data.metrics,
)
Expand Down Expand Up @@ -430,6 +439,7 @@ impl FlushTask {
meta_edit: MetaEdit::Update(meta_update),
}
};
// Update manifest and remove immutable memtables
self.space_store
.manifest
.apply_edit(edit_req)
Expand Down
8 changes: 8 additions & 0 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ pub struct Instance {
pub(crate) replay_batch_size: usize,
/// Write sst max buffer size
pub(crate) write_sst_max_buffer_size: usize,
/// Max retry limit to flush memtables
pub(crate) max_retry_flush_limit: usize,
/// Max bytes per write batch
pub(crate) max_bytes_per_write_batch: Option<usize>,
/// Options for scanning sst
Expand Down Expand Up @@ -191,6 +193,7 @@ impl Instance {
} else {
None
},
max_retry_flush_limit: 0,
};

let flusher = self.make_flusher();
Expand Down Expand Up @@ -274,6 +277,11 @@ impl Instance {
write_sst_max_buffer_size: self.write_sst_max_buffer_size,
}
}

#[inline]
fn max_retry_flush_limit(&self) -> usize {
self.max_retry_flush_limit
}
}

/// Instance reference
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl Instance {
space_write_buffer_size: ctx.config.space_write_buffer_size,
replay_batch_size: ctx.config.replay_batch_size,
write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_byte() as usize,
max_retry_flush_limit: ctx.config.max_retry_flush_limit,
max_bytes_per_write_batch: ctx
.config
.max_bytes_per_write_batch
Expand Down Expand Up @@ -325,6 +326,7 @@ impl Instance {
let opts = TableFlushOptions {
res_sender: None,
compact_after_flush: None,
max_retry_flush_limit: self.max_retry_flush_limit,
};
let flusher = self.make_flusher();
let flush_scheduler = serial_exec.flush_scheduler();
Expand Down
53 changes: 47 additions & 6 deletions analytic_engine/src/instance/serial_executor.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{
sync::{Arc, Mutex},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::Instant,
};

use common_util::{runtime::Runtime, time::InstantExt};
use futures::Future;
use log::error;
use log::{error, warn};
use table_engine::table::TableId;
use tokio::sync::{
oneshot,
watch::{self, Receiver, Sender},
};

use super::flush_compaction::{BackgroundFlushFailed, TableFlushOptions};
use crate::{
instance::flush_compaction::{BackgroundFlushFailed, Other, Result},
instance::flush_compaction::{Other, Result},
table::metrics::Metrics,
};

Expand All @@ -34,6 +38,26 @@ type ScheduleSyncRef = Arc<ScheduleSync>;
struct ScheduleSync {
state: Mutex<FlushState>,
notifier: Sender<()>,
continuous_flush_failure_count: AtomicUsize,
}

impl ScheduleSync {
#[inline]
pub fn should_retry_flush(&self, max_retry_limit: usize) -> bool {
self.continuous_flush_failure_count.load(Ordering::Relaxed) < max_retry_limit
}

#[inline]
pub fn reset_flush_failure_count(&self) {
self.continuous_flush_failure_count
.store(0, Ordering::Relaxed);
}

#[inline]
pub fn inc_flush_failure_count(&self) {
self.continuous_flush_failure_count
.fetch_add(1, Ordering::Relaxed);
}
}

pub struct TableFlushScheduler {
Expand All @@ -47,6 +71,7 @@ impl Default for TableFlushScheduler {
let schedule_sync = ScheduleSync {
state: Mutex::new(FlushState::Ready),
notifier: tx,
continuous_flush_failure_count: AtomicUsize::new(0),
};
Self {
schedule_sync: Arc::new(schedule_sync),
Expand Down Expand Up @@ -105,7 +130,7 @@ impl TableFlushScheduler {
flush_job: F,
on_flush_success: T,
block_on_write_thread: bool,
res_sender: Option<oneshot::Sender<Result<()>>>,
opts: TableFlushOptions,
runtime: &Runtime,
metrics: &Metrics,
) -> Result<()>
Expand All @@ -131,7 +156,21 @@ impl TableFlushScheduler {
}
FlushState::Flushing => (),
FlushState::Failed { err_msg } => {
return BackgroundFlushFailed { msg: err_msg }.fail();
if self
.schedule_sync
.should_retry_flush(opts.max_retry_flush_limit)
{
warn!("Re-flush memory tables after background flush failed:{err_msg}");
// Mark the worker is flushing.
*flush_state = FlushState::Flushing;
break;
} else {
return BackgroundFlushFailed {
msg: err_msg,
retry_count: opts.max_retry_flush_limit,
}
.fail();
}
}
}

Expand Down Expand Up @@ -164,7 +203,7 @@ impl TableFlushScheduler {
if flush_res.is_ok() {
on_flush_success.await;
}
send_flush_result(res_sender, flush_res);
send_flush_result(opts.res_sender, flush_res);
};

if block_on_write_thread {
Expand All @@ -182,9 +221,11 @@ fn on_flush_finished(schedule_sync: ScheduleSyncRef, res: &Result<()>) {
let mut flush_state = schedule_sync.state.lock().unwrap();
match res {
Ok(()) => {
schedule_sync.reset_flush_failure_count();
*flush_state = FlushState::Ready;
}
Err(e) => {
schedule_sync.inc_flush_failure_count();
let err_msg = e.to_string();
*flush_state = FlushState::Failed { err_msg };
}
Expand Down
6 changes: 5 additions & 1 deletion analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,11 @@ impl<'a> Writer<'a> {
/// acquired in advance. And in order to avoid deadlock, we should not wait
/// for the lock.
async fn handle_memtable_flush(&mut self, table_data: &TableDataRef) -> Result<()> {
let opts = TableFlushOptions::default();
let opts = TableFlushOptions {
res_sender: None,
compact_after_flush: None,
max_retry_flush_limit: self.instance.max_retry_flush_limit(),
};
let flusher = self.instance.make_flusher();
if table_data.id == self.table_data.id {
let flush_scheduler = self.serial_exec.flush_scheduler();
Expand Down
3 changes: 3 additions & 0 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub struct Config {
pub sst_background_read_parallelism: usize,
/// Max buffer size for writing sst
pub write_sst_max_buffer_size: ReadableSize,
/// Max retry limit After flush failed
pub max_retry_flush_limit: usize,
/// Max bytes per write batch.
///
/// If this is set, the atomicity of write request will be broken.
Expand Down Expand Up @@ -119,6 +121,7 @@ impl Default for Config {
sst_background_read_parallelism: 8,
scan_max_record_batches_in_flight: 1024,
write_sst_max_buffer_size: ReadableSize::mb(10),
max_retry_flush_limit: 0,
max_bytes_per_write_batch: None,
wal: WalStorageConfig::RocksDB(Box::default()),
remote_engine_client: remote_engine_client::config::Config::default(),
Expand Down