From a1bc7aa47a9699671369b54b5daa25b0a1286e6d Mon Sep 17 00:00:00 2001
From: jiacai2050 <dev@liujiacai.net>
Date: Tue, 23 May 2023 14:05:14 +0800
Subject: [PATCH 1/3] fix: ensure at least pick 2 files for compaction

---
 analytic_engine/src/compaction/picker.rs | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs
index 314b4d17aa..e187754281 100644
--- a/analytic_engine/src/compaction/picker.rs
+++ b/analytic_engine/src/compaction/picker.rs
@@ -520,12 +520,15 @@ impl TimeWindowPicker {
                     // Sort by sstable file size
                     let mut sorted_files = bucket.to_vec();
                     sorted_files.sort_unstable_by_key(FileHandle::size);
-
-                    return Some(trim_to_threshold(
+                    let candidate_files = trim_to_threshold(
                         sorted_files,
                         size_tiered_opts.max_threshold,
                         max_input_sstable_size,
-                    ));
+                    );
+                    // At least 2 sst for compact
+                    if candidate_files.len() > 1 {
+                        return Some(candidate_files);
+                    }
                 } else {
                     debug!(
                         "No compaction necessary for bucket size {} , key {}, now {}",

From 86e574ef5d88d1202bbb72680643ee3f45e59fa7 Mon Sep 17 00:00:00 2001
From: jiacai2050 <dev@liujiacai.net>
Date: Tue, 23 May 2023 14:58:13 +0800
Subject: [PATCH 2/3] adjust estimate mem usage

---
 analytic_engine/src/compaction/scheduler.rs | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs
index daf5921ac9..73a2bbe566 100644
--- a/analytic_engine/src/compaction/scheduler.rs
+++ b/analytic_engine/src/compaction/scheduler.rs
@@ -72,8 +72,7 @@ impl Default for SchedulerConfig {
     fn default() -> Self {
         Self {
             schedule_channel_len: 16,
-            // 30 minutes schedule interval.
-            schedule_interval: ReadableDuration(Duration::from_secs(60 * 5)),
+            schedule_interval: ReadableDuration(Duration::from_secs(30)),
             max_ongoing_tasks: MAX_GOING_COMPACTION_TASKS,
             // flush_interval default is 5h.
             max_unflushed_duration: ReadableDuration(Duration::from_secs(60 * 60 * 5)),
@@ -545,7 +544,9 @@ impl ScheduleWorker {
         task: &CompactionTask,
     ) -> Option<MemoryUsageToken> {
         let input_size = task.estimated_total_input_file_size();
-        let estimate_memory_usage = input_size * 2;
+        // Currently sst build is in a streaming way, so it wouldn't consume memory more
+        // than its size.
+        let estimate_memory_usage = input_size;
 
         let token = self.memory_limit.try_apply_token(estimate_memory_usage);
 

From 27267b55faf1bce3c263b49d2fb61bcd5ce55c00 Mon Sep 17 00:00:00 2001
From: jiacai2050 <dev@liujiacai.net>
Date: Tue, 23 May 2023 16:51:36 +0800
Subject: [PATCH 3/3] add unit test

---
 analytic_engine/src/compaction/picker.rs | 56 +++++++++++++++++++++---
 1 file changed, 50 insertions(+), 6 deletions(-)

diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs
index e187754281..96600199f0 100644
--- a/analytic_engine/src/compaction/picker.rs
+++ b/analytic_engine/src/compaction/picker.rs
@@ -525,7 +525,7 @@ impl TimeWindowPicker {
                         size_tiered_opts.max_threshold,
                         max_input_sstable_size,
                     );
-                    // At least 2 sst for compact
+                    // At least 2 sst for compaction
                     if candidate_files.len() > 1 {
                         return Some(candidate_files);
                     }
@@ -612,6 +612,7 @@ mod tests {
         tests::build_schema,
         time::{TimeRange, Timestamp},
     };
+    use common_util::hash_map;
     use tokio::sync::mpsc;
 
     use super::*;
@@ -770,17 +771,17 @@ mod tests {
         }
     }
 
-    fn build_file_handles(sizes: Vec<u64>) -> Vec<FileHandle> {
+    fn build_file_handles(sizes: Vec<(u64, TimeRange)>) -> Vec<FileHandle> {
         let (tx, _rx) = mpsc::unbounded_channel();
 
         sizes
             .into_iter()
-            .map(|size| {
+            .map(|(size, time_range)| {
                 let file_meta = FileMeta {
-                    id: 1,
                     size,
+                    time_range,
+                    id: 1,
                     row_num: 0,
-                    time_range: TimeRange::empty(),
                     max_seq: 0,
                     storage_format: StorageFormat::default(),
                 };
@@ -792,7 +793,12 @@ mod tests {
 
     #[test]
     fn test_size_tiered_picker() {
-        let bucket = Bucket::with_files(build_file_handles(vec![100, 110, 200]));
+        let time_range = TimeRange::empty();
+        let bucket = Bucket::with_files(build_file_handles(vec![
+            (100, time_range),
+            (110, time_range),
+            (200, time_range),
+        ]));
 
         let (out_bucket, _) =
             SizeTieredPicker::trim_to_threshold_with_hotness(bucket.clone(), 10, 300);
@@ -836,4 +842,42 @@ mod tests {
         assert_eq!(bucket.avg_size, 0);
         assert!(bucket.files.is_empty());
     }
+
+    #[test]
+    fn test_time_window_newest_bucket() {
+        let size_tiered_opts = SizeTieredCompactionOptions::default();
+        // old bucket have enough sst for compaction
+        {
+            let old_bucket = build_file_handles(vec![
+                (102, TimeRange::new_unchecked_for_test(100, 200)),
+                (100, TimeRange::new_unchecked_for_test(100, 200)),
+                (101, TimeRange::new_unchecked_for_test(100, 200)),
+            ]);
+            let new_bucket = build_file_handles(vec![
+                (200, TimeRange::new_unchecked_for_test(200, 300)),
+                (201, TimeRange::new_unchecked_for_test(200, 300)),
+            ]);
+
+            let buckets = hash_map! { 100 => old_bucket, 200 => new_bucket };
+            let bucket = TimeWindowPicker::newest_bucket(buckets, size_tiered_opts, 200).unwrap();
+            assert_eq!(
+                vec![100, 101, 102],
+                bucket.into_iter().map(|f| f.size()).collect::<Vec<_>>()
+            );
+        }
+
+        // old bucket have only 1 sst, which is not enough for compaction
+        {
+            let old_bucket =
+                build_file_handles(vec![(100, TimeRange::new_unchecked_for_test(100, 200))]);
+            let new_bucket = build_file_handles(vec![
+                (200, TimeRange::new_unchecked_for_test(200, 300)),
+                (201, TimeRange::new_unchecked_for_test(200, 300)),
+            ]);
+
+            let buckets = hash_map! { 100 => old_bucket, 200 => new_bucket };
+            let bucket = TimeWindowPicker::newest_bucket(buckets, size_tiered_opts, 200);
+            assert_eq!(None, bucket);
+        }
+    }
 }