From a1bc7aa47a9699671369b54b5daa25b0a1286e6d Mon Sep 17 00:00:00 2001 From: jiacai2050 <dev@liujiacai.net> Date: Tue, 23 May 2023 14:05:14 +0800 Subject: [PATCH 1/3] fix: ensure at least pick 2 files for compaction --- analytic_engine/src/compaction/picker.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index 314b4d17aa..e187754281 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -520,12 +520,15 @@ impl TimeWindowPicker { // Sort by sstable file size let mut sorted_files = bucket.to_vec(); sorted_files.sort_unstable_by_key(FileHandle::size); - - return Some(trim_to_threshold( + let candidate_files = trim_to_threshold( sorted_files, size_tiered_opts.max_threshold, max_input_sstable_size, - )); + ); + // At least 2 sst for compact + if candidate_files.len() > 1 { + return Some(candidate_files); + } } else { debug!( "No compaction necessary for bucket size {} , key {}, now {}", From 86e574ef5d88d1202bbb72680643ee3f45e59fa7 Mon Sep 17 00:00:00 2001 From: jiacai2050 <dev@liujiacai.net> Date: Tue, 23 May 2023 14:58:13 +0800 Subject: [PATCH 2/3] adjust estimate mem usage --- analytic_engine/src/compaction/scheduler.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index daf5921ac9..73a2bbe566 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -72,8 +72,7 @@ impl Default for SchedulerConfig { fn default() -> Self { Self { schedule_channel_len: 16, - // 30 minutes schedule interval. - schedule_interval: ReadableDuration(Duration::from_secs(60 * 5)), + schedule_interval: ReadableDuration(Duration::from_secs(30)), max_ongoing_tasks: MAX_GOING_COMPACTION_TASKS, // flush_interval default is 5h. max_unflushed_duration: ReadableDuration(Duration::from_secs(60 * 60 * 5)), @@ -545,7 +544,9 @@ impl ScheduleWorker { task: &CompactionTask, ) -> Option<MemoryUsageToken> { let input_size = task.estimated_total_input_file_size(); - let estimate_memory_usage = input_size * 2; + // Currently sst build is in a streaming way, so it wouldn't consume memory more + // than its size. + let estimate_memory_usage = input_size; let token = self.memory_limit.try_apply_token(estimate_memory_usage); From 27267b55faf1bce3c263b49d2fb61bcd5ce55c00 Mon Sep 17 00:00:00 2001 From: jiacai2050 <dev@liujiacai.net> Date: Tue, 23 May 2023 16:51:36 +0800 Subject: [PATCH 3/3] add unit test --- analytic_engine/src/compaction/picker.rs | 56 +++++++++++++++++++++--- 1 file changed, 50 insertions(+), 6 deletions(-) diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index e187754281..96600199f0 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -525,7 +525,7 @@ impl TimeWindowPicker { size_tiered_opts.max_threshold, max_input_sstable_size, ); - // At least 2 sst for compact + // At least 2 sst for compaction if candidate_files.len() > 1 { return Some(candidate_files); } @@ -612,6 +612,7 @@ mod tests { tests::build_schema, time::{TimeRange, Timestamp}, }; + use common_util::hash_map; use tokio::sync::mpsc; use super::*; @@ -770,17 +771,17 @@ mod tests { } } - fn build_file_handles(sizes: Vec<u64>) -> Vec<FileHandle> { + fn build_file_handles(sizes: Vec<(u64, TimeRange)>) -> Vec<FileHandle> { let (tx, _rx) = mpsc::unbounded_channel(); sizes .into_iter() - .map(|size| { + .map(|(size, time_range)| { let file_meta = FileMeta { - id: 1, size, + time_range, + id: 1, row_num: 0, - time_range: TimeRange::empty(), max_seq: 0, storage_format: StorageFormat::default(), }; @@ -792,7 +793,12 @@ mod tests { #[test] fn test_size_tiered_picker() { - let bucket = Bucket::with_files(build_file_handles(vec![100, 110, 200])); + let time_range = TimeRange::empty(); + let bucket = Bucket::with_files(build_file_handles(vec![ + (100, time_range), + (110, time_range), + (200, time_range), + ])); let (out_bucket, _) = SizeTieredPicker::trim_to_threshold_with_hotness(bucket.clone(), 10, 300); @@ -836,4 +842,42 @@ mod tests { assert_eq!(bucket.avg_size, 0); assert!(bucket.files.is_empty()); } + + #[test] + fn test_time_window_newest_bucket() { + let size_tiered_opts = SizeTieredCompactionOptions::default(); + // old bucket have enough sst for compaction + { + let old_bucket = build_file_handles(vec![ + (102, TimeRange::new_unchecked_for_test(100, 200)), + (100, TimeRange::new_unchecked_for_test(100, 200)), + (101, TimeRange::new_unchecked_for_test(100, 200)), + ]); + let new_bucket = build_file_handles(vec![ + (200, TimeRange::new_unchecked_for_test(200, 300)), + (201, TimeRange::new_unchecked_for_test(200, 300)), + ]); + + let buckets = hash_map! { 100 => old_bucket, 200 => new_bucket }; + let bucket = TimeWindowPicker::newest_bucket(buckets, size_tiered_opts, 200).unwrap(); + assert_eq!( + vec![100, 101, 102], + bucket.into_iter().map(|f| f.size()).collect::<Vec<_>>() + ); + } + + // old bucket have only 1 sst, which is not enough for compaction + { + let old_bucket = + build_file_handles(vec![(100, TimeRange::new_unchecked_for_test(100, 200))]); + let new_bucket = build_file_handles(vec![ + (200, TimeRange::new_unchecked_for_test(200, 300)), + (201, TimeRange::new_unchecked_for_test(200, 300)), + ]); + + let buckets = hash_map! { 100 => old_bucket, 200 => new_bucket }; + let bucket = TimeWindowPicker::newest_bucket(buckets, size_tiered_opts, 200); + assert_eq!(None, bucket); + } + } }