From d18122021549f74f033bfb25bf2ee69d81134fcf Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Thu, 11 May 2023 20:23:14 +0800 Subject: [PATCH 1/8] retry flush --- analytic_engine/src/instance/serial_executor.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/analytic_engine/src/instance/serial_executor.rs b/analytic_engine/src/instance/serial_executor.rs index 0715360bd4..c0620f09b5 100644 --- a/analytic_engine/src/instance/serial_executor.rs +++ b/analytic_engine/src/instance/serial_executor.rs @@ -7,7 +7,7 @@ use std::{ use common_util::{runtime::Runtime, time::InstantExt}; use futures::Future; -use log::error; +use log::{error, warn}; use table_engine::table::TableId; use tokio::sync::{ oneshot, @@ -131,7 +131,10 @@ impl TableFlushScheduler { } FlushState::Flushing => (), FlushState::Failed { err_msg } => { - return BackgroundFlushFailed { msg: err_msg }.fail(); + // Mark the worker is flushing. + *flush_state = FlushState::Flushing; + warn!("Retry to flush memory tables after background flush failed:{err_msg}"); + break; } } From 3c8a7d775903e6ca8a0b187256356ce24872eec5 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Thu, 11 May 2023 20:28:07 +0800 Subject: [PATCH 2/8] err msg --- analytic_engine/src/instance/serial_executor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/analytic_engine/src/instance/serial_executor.rs b/analytic_engine/src/instance/serial_executor.rs index c0620f09b5..5ab14c3e9d 100644 --- a/analytic_engine/src/instance/serial_executor.rs +++ b/analytic_engine/src/instance/serial_executor.rs @@ -15,7 +15,7 @@ use tokio::sync::{ }; use crate::{ - instance::flush_compaction::{BackgroundFlushFailed, Other, Result}, + instance::flush_compaction::{Other, Result}, table::metrics::Metrics, }; @@ -131,9 +131,9 @@ impl TableFlushScheduler { } FlushState::Flushing => (), FlushState::Failed { err_msg } => { + warn!("Retry to flush memory tables after background flush failed:{err_msg}"); // Mark the worker is flushing. *flush_state = FlushState::Flushing; - warn!("Retry to flush memory tables after background flush failed:{err_msg}"); break; } } From 6f794ff43399901efb06eb250cc03cc05264c9e0 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Thu, 11 May 2023 20:30:29 +0800 Subject: [PATCH 3/8] reflush --- analytic_engine/src/instance/serial_executor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analytic_engine/src/instance/serial_executor.rs b/analytic_engine/src/instance/serial_executor.rs index 5ab14c3e9d..7025303ba3 100644 --- a/analytic_engine/src/instance/serial_executor.rs +++ b/analytic_engine/src/instance/serial_executor.rs @@ -131,7 +131,7 @@ impl TableFlushScheduler { } FlushState::Flushing => (), FlushState::Failed { err_msg } => { - warn!("Retry to flush memory tables after background flush failed:{err_msg}"); + warn!("Re-flush memory tables after background flush failed:{err_msg}"); // Mark the worker is flushing. *flush_state = FlushState::Flushing; break; From 5dca06503ee5ef2b48617696d622d89fc58fbc49 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Fri, 12 May 2023 11:01:25 +0800 Subject: [PATCH 4/8] add option --- .../src/instance/flush_compaction.rs | 11 ++++++++--- analytic_engine/src/instance/mod.rs | 8 ++++++++ analytic_engine/src/instance/open.rs | 2 ++ analytic_engine/src/instance/serial_executor.rs | 17 +++++++++++------ analytic_engine/src/instance/write.rs | 6 +++++- analytic_engine/src/lib.rs | 3 +++ 6 files changed, 37 insertions(+), 10 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 78c06ddf7e..06dfe47047 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -144,6 +144,10 @@ pub struct TableFlushOptions { /// /// If it is [None], no compaction will be scheduled. pub compact_after_flush: Option, + /// Shoud retry flush After flush failed + /// + /// Default is False + pub retry_flush: bool, } impl fmt::Debug for TableFlushOptions { @@ -286,7 +290,7 @@ impl Flusher { // TODO: The immediate compaction after flush is not a good idea because it may // block on the write procedure. - if let Some(compaction_scheduler) = opts.compact_after_flush { + if let Some(compaction_scheduler) = opts.compact_after_flush.clone() { // Schedule compaction if flush completed successfully. let compact_req = TableCompactionRequest::no_waiter(table_data.clone()); let on_flush_success = async move { @@ -301,7 +305,7 @@ impl Flusher { flush_job, on_flush_success, block_on, - opts.res_sender, + opts, &self.runtime, &table_data.metrics, ) @@ -312,7 +316,7 @@ impl Flusher { flush_job, async {}, block_on, - opts.res_sender, + opts, &self.runtime, &table_data.metrics, ) @@ -430,6 +434,7 @@ impl FlushTask { meta_edit: MetaEdit::Update(meta_update), } }; + // Update manifest and remove immutable memtables self.space_store .manifest .apply_edit(edit_req) diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 844319d660..b590cc2e68 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -151,6 +151,8 @@ pub struct Instance { pub(crate) replay_batch_size: usize, /// Write sst max buffer size pub(crate) write_sst_max_buffer_size: usize, + /// Shoud retry flush After flush failed + pub(crate) retry_flush: bool, /// Max bytes per write batch pub(crate) max_bytes_per_write_batch: Option, /// Options for scanning sst @@ -191,6 +193,7 @@ impl Instance { } else { None }, + retry_flush: false, }; let flusher = self.make_flusher(); @@ -274,6 +277,11 @@ impl Instance { write_sst_max_buffer_size: self.write_sst_max_buffer_size, } } + + #[inline] + fn should_retry_flush(&self) -> bool { + self.retry_flush + } } /// Instance reference diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index ee3e54a6ca..3dda7bee80 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -124,6 +124,7 @@ impl Instance { space_write_buffer_size: ctx.config.space_write_buffer_size, replay_batch_size: ctx.config.replay_batch_size, write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_byte() as usize, + retry_flush: ctx.config.retry_flush, max_bytes_per_write_batch: ctx .config .max_bytes_per_write_batch @@ -325,6 +326,7 @@ impl Instance { let opts = TableFlushOptions { res_sender: None, compact_after_flush: None, + retry_flush: self.retry_flush, }; let flusher = self.make_flusher(); let flush_scheduler = serial_exec.flush_scheduler(); diff --git a/analytic_engine/src/instance/serial_executor.rs b/analytic_engine/src/instance/serial_executor.rs index 7025303ba3..c82878f895 100644 --- a/analytic_engine/src/instance/serial_executor.rs +++ b/analytic_engine/src/instance/serial_executor.rs @@ -14,6 +14,7 @@ use tokio::sync::{ watch::{self, Receiver, Sender}, }; +use super::flush_compaction::{BackgroundFlushFailed, TableFlushOptions}; use crate::{ instance::flush_compaction::{Other, Result}, table::metrics::Metrics, @@ -105,7 +106,7 @@ impl TableFlushScheduler { flush_job: F, on_flush_success: T, block_on_write_thread: bool, - res_sender: Option>>, + opts: TableFlushOptions, runtime: &Runtime, metrics: &Metrics, ) -> Result<()> @@ -131,10 +132,14 @@ impl TableFlushScheduler { } FlushState::Flushing => (), FlushState::Failed { err_msg } => { - warn!("Re-flush memory tables after background flush failed:{err_msg}"); - // Mark the worker is flushing. - *flush_state = FlushState::Flushing; - break; + if opts.retry_flush { + warn!("Re-flush memory tables after background flush failed:{err_msg}"); + // Mark the worker is flushing. + *flush_state = FlushState::Flushing; + break; + } else { + return BackgroundFlushFailed { msg: err_msg }.fail(); + } } } @@ -167,7 +172,7 @@ impl TableFlushScheduler { if flush_res.is_ok() { on_flush_success.await; } - send_flush_result(res_sender, flush_res); + send_flush_result(opts.res_sender, flush_res); }; if block_on_write_thread { diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index 52c278774e..a9b4c3bd81 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -590,7 +590,11 @@ impl<'a> Writer<'a> { /// acquired in advance. And in order to avoid deadlock, we should not wait /// for the lock. async fn handle_memtable_flush(&mut self, table_data: &TableDataRef) -> Result<()> { - let opts = TableFlushOptions::default(); + let opts = TableFlushOptions { + res_sender: None, + compact_after_flush: None, + retry_flush: self.instance.should_retry_flush(), + }; let flusher = self.instance.make_flusher(); if table_data.id == self.table_data.id { let flush_scheduler = self.serial_exec.flush_scheduler(); diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index d3779df67c..04c9509ff6 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -81,6 +81,8 @@ pub struct Config { pub sst_background_read_parallelism: usize, /// Max buffer size for writing sst pub write_sst_max_buffer_size: ReadableSize, + /// Shoud retry flush After flush failed + pub retry_flush: bool, /// Max bytes per write batch. /// /// If this is set, the atomicity of write request will be broken. @@ -119,6 +121,7 @@ impl Default for Config { sst_background_read_parallelism: 8, scan_max_record_batches_in_flight: 1024, write_sst_max_buffer_size: ReadableSize::mb(10), + retry_flush: true, max_bytes_per_write_batch: None, wal: WalStorageConfig::RocksDB(Box::default()), remote_engine_client: remote_engine_client::config::Config::default(), From 37c49ccacbfa42747470f1ee1031d873d6ee793b Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Fri, 12 May 2023 11:03:06 +0800 Subject: [PATCH 5/8] default false --- analytic_engine/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index 04c9509ff6..f18c836303 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -121,7 +121,7 @@ impl Default for Config { sst_background_read_parallelism: 8, scan_max_record_batches_in_flight: 1024, write_sst_max_buffer_size: ReadableSize::mb(10), - retry_flush: true, + retry_flush: false, max_bytes_per_write_batch: None, wal: WalStorageConfig::RocksDB(Box::default()), remote_engine_client: remote_engine_client::config::Config::default(), From a33596923eb85b1b7de08f6d0f4f0adb1dc34d3d Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Fri, 12 May 2023 13:53:39 +0800 Subject: [PATCH 6/8] retry limit --- .../src/instance/flush_compaction.rs | 15 +++++--- analytic_engine/src/instance/mod.rs | 10 ++--- analytic_engine/src/instance/open.rs | 4 +- .../src/instance/serial_executor.rs | 37 +++++++++++++++++-- analytic_engine/src/instance/write.rs | 2 +- analytic_engine/src/lib.rs | 6 +-- 6 files changed, 55 insertions(+), 19 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 06dfe47047..f503bbb3ba 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -96,11 +96,16 @@ pub enum Error { WriteSst { path: String, source: GenericError }, #[snafu(display( - "Background flush failed, cannot write more data, err:{}.\nBacktrace:\n{}", + "Background flush failed, cannot write more data, retry_count:{}, err:{}.\nBacktrace:\n{}", + retry_count, msg, backtrace ))] - BackgroundFlushFailed { msg: String, backtrace: Backtrace }, + BackgroundFlushFailed { + msg: String, + retry_count: usize, + backtrace: Backtrace, + }, #[snafu(display("Failed to build merge iterator, table:{}, err:{}", table, source))] BuildMergeIterator { @@ -144,10 +149,10 @@ pub struct TableFlushOptions { /// /// If it is [None], no compaction will be scheduled. pub compact_after_flush: Option, - /// Shoud retry flush After flush failed + /// Max retry limit After flush failed /// - /// Default is False - pub retry_flush: bool, + /// Default is 0 + pub max_retry_flush_limit: usize, } impl fmt::Debug for TableFlushOptions { diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index b590cc2e68..cefab69192 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -151,8 +151,8 @@ pub struct Instance { pub(crate) replay_batch_size: usize, /// Write sst max buffer size pub(crate) write_sst_max_buffer_size: usize, - /// Shoud retry flush After flush failed - pub(crate) retry_flush: bool, + /// Max retry limit to flush memtables + pub(crate) max_retry_flush_limit: usize, /// Max bytes per write batch pub(crate) max_bytes_per_write_batch: Option, /// Options for scanning sst @@ -193,7 +193,7 @@ impl Instance { } else { None }, - retry_flush: false, + max_retry_flush_limit: 0, }; let flusher = self.make_flusher(); @@ -279,8 +279,8 @@ impl Instance { } #[inline] - fn should_retry_flush(&self) -> bool { - self.retry_flush + fn max_retry_flush_limit(&self) -> usize { + self.max_retry_flush_limit } } diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 3dda7bee80..bb6421f244 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -124,7 +124,7 @@ impl Instance { space_write_buffer_size: ctx.config.space_write_buffer_size, replay_batch_size: ctx.config.replay_batch_size, write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_byte() as usize, - retry_flush: ctx.config.retry_flush, + max_retry_flush_limit: ctx.config.max_retry_flush_limit, max_bytes_per_write_batch: ctx .config .max_bytes_per_write_batch @@ -326,7 +326,7 @@ impl Instance { let opts = TableFlushOptions { res_sender: None, compact_after_flush: None, - retry_flush: self.retry_flush, + max_retry_flush_limit: self.max_retry_flush_limit, }; let flusher = self.make_flusher(); let flush_scheduler = serial_exec.flush_scheduler(); diff --git a/analytic_engine/src/instance/serial_executor.rs b/analytic_engine/src/instance/serial_executor.rs index c82878f895..addd8e2133 100644 --- a/analytic_engine/src/instance/serial_executor.rs +++ b/analytic_engine/src/instance/serial_executor.rs @@ -1,7 +1,10 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. use std::{ - sync::{Arc, Mutex}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, time::Instant, }; @@ -35,6 +38,25 @@ type ScheduleSyncRef = Arc; struct ScheduleSync { state: Mutex, notifier: Sender<()>, + retry_flush_count: AtomicUsize, +} + +impl ScheduleSync { + #[inline] + pub fn should_retry_flush(&self, max_retry_limit: usize) -> bool { + if self.retry_flush_count.load(Ordering::Relaxed) < max_retry_limit { + self.retry_flush_count.fetch_add(1, Ordering::Relaxed); + return true; + } + false + } + + #[inline] + pub fn reset_retry_flush_count(&self) { + if self.retry_flush_count.load(Ordering::Relaxed) > 0 { + self.retry_flush_count.store(0, Ordering::Relaxed); + } + } } pub struct TableFlushScheduler { @@ -48,6 +70,7 @@ impl Default for TableFlushScheduler { let schedule_sync = ScheduleSync { state: Mutex::new(FlushState::Ready), notifier: tx, + retry_flush_count: AtomicUsize::new(0), }; Self { schedule_sync: Arc::new(schedule_sync), @@ -132,13 +155,20 @@ impl TableFlushScheduler { } FlushState::Flushing => (), FlushState::Failed { err_msg } => { - if opts.retry_flush { + if self + .schedule_sync + .should_retry_flush(opts.max_retry_flush_limit) + { warn!("Re-flush memory tables after background flush failed:{err_msg}"); // Mark the worker is flushing. *flush_state = FlushState::Flushing; break; } else { - return BackgroundFlushFailed { msg: err_msg }.fail(); + return BackgroundFlushFailed { + msg: err_msg, + retry_count: opts.max_retry_flush_limit, + } + .fail(); } } } @@ -190,6 +220,7 @@ fn on_flush_finished(schedule_sync: ScheduleSyncRef, res: &Result<()>) { let mut flush_state = schedule_sync.state.lock().unwrap(); match res { Ok(()) => { + schedule_sync.reset_retry_flush_count(); *flush_state = FlushState::Ready; } Err(e) => { diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index a9b4c3bd81..fada864422 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -593,7 +593,7 @@ impl<'a> Writer<'a> { let opts = TableFlushOptions { res_sender: None, compact_after_flush: None, - retry_flush: self.instance.should_retry_flush(), + max_retry_flush_limit: self.instance.max_retry_flush_limit(), }; let flusher = self.instance.make_flusher(); if table_data.id == self.table_data.id { diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index f18c836303..d31bf9b488 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -81,8 +81,8 @@ pub struct Config { pub sst_background_read_parallelism: usize, /// Max buffer size for writing sst pub write_sst_max_buffer_size: ReadableSize, - /// Shoud retry flush After flush failed - pub retry_flush: bool, + /// Max retry limit After flush failed + pub max_retry_flush_limit: usize, /// Max bytes per write batch. /// /// If this is set, the atomicity of write request will be broken. @@ -121,7 +121,7 @@ impl Default for Config { sst_background_read_parallelism: 8, scan_max_record_batches_in_flight: 1024, write_sst_max_buffer_size: ReadableSize::mb(10), - retry_flush: false, + max_retry_flush_limit: 0, max_bytes_per_write_batch: None, wal: WalStorageConfig::RocksDB(Box::default()), remote_engine_client: remote_engine_client::config::Config::default(), From b73548eb0a7878e7399ad57984d615e57fc6b7e7 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Fri, 12 May 2023 15:37:11 +0800 Subject: [PATCH 7/8] chore --- .../src/instance/serial_executor.rs | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/analytic_engine/src/instance/serial_executor.rs b/analytic_engine/src/instance/serial_executor.rs index addd8e2133..8131c344ef 100644 --- a/analytic_engine/src/instance/serial_executor.rs +++ b/analytic_engine/src/instance/serial_executor.rs @@ -38,24 +38,23 @@ type ScheduleSyncRef = Arc; struct ScheduleSync { state: Mutex, notifier: Sender<()>, - retry_flush_count: AtomicUsize, + continuous_flush_failure_count: AtomicUsize, } impl ScheduleSync { #[inline] pub fn should_retry_flush(&self, max_retry_limit: usize) -> bool { - if self.retry_flush_count.load(Ordering::Relaxed) < max_retry_limit { - self.retry_flush_count.fetch_add(1, Ordering::Relaxed); - return true; - } - false + self.continuous_flush_failure_count.load(Ordering::Relaxed) < max_retry_limit } #[inline] - pub fn reset_retry_flush_count(&self) { - if self.retry_flush_count.load(Ordering::Relaxed) > 0 { - self.retry_flush_count.store(0, Ordering::Relaxed); - } + pub fn reset_flush_failure_count(&self) { + self.continuous_flush_failure_count.store(0, Ordering::Relaxed); + } + + #[inline] + pub fn inc_flush_failure_count(&self) { + self.continuous_flush_failure_count.fetch_add(1, Ordering::Relaxed); } } @@ -70,7 +69,7 @@ impl Default for TableFlushScheduler { let schedule_sync = ScheduleSync { state: Mutex::new(FlushState::Ready), notifier: tx, - retry_flush_count: AtomicUsize::new(0), + continuous_flush_failure_count: AtomicUsize::new(0), }; Self { schedule_sync: Arc::new(schedule_sync), @@ -220,10 +219,11 @@ fn on_flush_finished(schedule_sync: ScheduleSyncRef, res: &Result<()>) { let mut flush_state = schedule_sync.state.lock().unwrap(); match res { Ok(()) => { - schedule_sync.reset_retry_flush_count(); + schedule_sync.reset_flush_failure_count(); *flush_state = FlushState::Ready; } Err(e) => { + schedule_sync.inc_flush_failure_count(); let err_msg = e.to_string(); *flush_state = FlushState::Failed { err_msg }; } From bf9f56f6d8cc5b7747e746b17ec5c37d50045eec Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Fri, 12 May 2023 15:46:24 +0800 Subject: [PATCH 8/8] fmt --- analytic_engine/src/instance/serial_executor.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/analytic_engine/src/instance/serial_executor.rs b/analytic_engine/src/instance/serial_executor.rs index 8131c344ef..19a4c6a628 100644 --- a/analytic_engine/src/instance/serial_executor.rs +++ b/analytic_engine/src/instance/serial_executor.rs @@ -44,17 +44,19 @@ struct ScheduleSync { impl ScheduleSync { #[inline] pub fn should_retry_flush(&self, max_retry_limit: usize) -> bool { - self.continuous_flush_failure_count.load(Ordering::Relaxed) < max_retry_limit + self.continuous_flush_failure_count.load(Ordering::Relaxed) < max_retry_limit } #[inline] pub fn reset_flush_failure_count(&self) { - self.continuous_flush_failure_count.store(0, Ordering::Relaxed); + self.continuous_flush_failure_count + .store(0, Ordering::Relaxed); } #[inline] pub fn inc_flush_failure_count(&self) { - self.continuous_flush_failure_count.fetch_add(1, Ordering::Relaxed); + self.continuous_flush_failure_count + .fetch_add(1, Ordering::Relaxed); } }