Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure pick at least 2 files for compaction #915

Merged
merged 3 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 55 additions & 8 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,12 +520,15 @@ impl TimeWindowPicker {
// Sort by sstable file size
let mut sorted_files = bucket.to_vec();
sorted_files.sort_unstable_by_key(FileHandle::size);

return Some(trim_to_threshold(
let candidate_files = trim_to_threshold(
sorted_files,
size_tiered_opts.max_threshold,
max_input_sstable_size,
));
);
// At least 2 sst for compaction
if candidate_files.len() > 1 {
return Some(candidate_files);
}
} else {
debug!(
"No compaction necessary for bucket size {} , key {}, now {}",
Expand Down Expand Up @@ -609,6 +612,7 @@ mod tests {
tests::build_schema,
time::{TimeRange, Timestamp},
};
use common_util::hash_map;
use tokio::sync::mpsc;

use super::*;
Expand Down Expand Up @@ -767,17 +771,17 @@ mod tests {
}
}

fn build_file_handles(sizes: Vec<u64>) -> Vec<FileHandle> {
fn build_file_handles(sizes: Vec<(u64, TimeRange)>) -> Vec<FileHandle> {
let (tx, _rx) = mpsc::unbounded_channel();

sizes
.into_iter()
.map(|size| {
.map(|(size, time_range)| {
let file_meta = FileMeta {
id: 1,
size,
time_range,
id: 1,
row_num: 0,
time_range: TimeRange::empty(),
max_seq: 0,
storage_format: StorageFormat::default(),
};
Expand All @@ -789,7 +793,12 @@ mod tests {

#[test]
fn test_size_tiered_picker() {
let bucket = Bucket::with_files(build_file_handles(vec![100, 110, 200]));
let time_range = TimeRange::empty();
let bucket = Bucket::with_files(build_file_handles(vec![
(100, time_range),
(110, time_range),
(200, time_range),
]));

let (out_bucket, _) =
SizeTieredPicker::trim_to_threshold_with_hotness(bucket.clone(), 10, 300);
Expand Down Expand Up @@ -833,4 +842,42 @@ mod tests {
assert_eq!(bucket.avg_size, 0);
assert!(bucket.files.is_empty());
}

#[test]
fn test_time_window_newest_bucket() {
let size_tiered_opts = SizeTieredCompactionOptions::default();
// old bucket have enough sst for compaction
{
let old_bucket = build_file_handles(vec![
(102, TimeRange::new_unchecked_for_test(100, 200)),
(100, TimeRange::new_unchecked_for_test(100, 200)),
(101, TimeRange::new_unchecked_for_test(100, 200)),
]);
let new_bucket = build_file_handles(vec![
(200, TimeRange::new_unchecked_for_test(200, 300)),
(201, TimeRange::new_unchecked_for_test(200, 300)),
]);

let buckets = hash_map! { 100 => old_bucket, 200 => new_bucket };
let bucket = TimeWindowPicker::newest_bucket(buckets, size_tiered_opts, 200).unwrap();
assert_eq!(
vec![100, 101, 102],
bucket.into_iter().map(|f| f.size()).collect::<Vec<_>>()
);
}

// old bucket have only 1 sst, which is not enough for compaction
{
let old_bucket =
build_file_handles(vec![(100, TimeRange::new_unchecked_for_test(100, 200))]);
let new_bucket = build_file_handles(vec![
(200, TimeRange::new_unchecked_for_test(200, 300)),
(201, TimeRange::new_unchecked_for_test(200, 300)),
]);

let buckets = hash_map! { 100 => old_bucket, 200 => new_bucket };
let bucket = TimeWindowPicker::newest_bucket(buckets, size_tiered_opts, 200);
assert_eq!(None, bucket);
}
}
}
7 changes: 4 additions & 3 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ impl Default for SchedulerConfig {
fn default() -> Self {
Self {
schedule_channel_len: 16,
// 30 minutes schedule interval.
schedule_interval: ReadableDuration(Duration::from_secs(60 * 5)),
schedule_interval: ReadableDuration(Duration::from_secs(30)),
max_ongoing_tasks: MAX_GOING_COMPACTION_TASKS,
// flush_interval default is 5h.
max_unflushed_duration: ReadableDuration(Duration::from_secs(60 * 60 * 5)),
Expand Down Expand Up @@ -545,7 +544,9 @@ impl ScheduleWorker {
task: &CompactionTask,
) -> Option<MemoryUsageToken> {
let input_size = task.estimated_total_input_file_size();
let estimate_memory_usage = input_size * 2;
// Currently sst build is in a streaming way, so it wouldn't consume memory more
// than its size.
let estimate_memory_usage = input_size;

let token = self.memory_limit.try_apply_token(estimate_memory_usage);

Expand Down