Skip to content

Commit 6628639

Browse files
jiacai2050MichaelLeeHZ
authored andcommitted
fix: ensure files can only be picked once (apache#995)
## Rationale In current design, sst files may be picked multiple times. ## Detailed Changes - Mark files as in compacting when pick files candidates, and reset it to false when CompactionTask is dropped. ## Test Plan Manually
1 parent f7f169d commit 6628639

File tree

5 files changed

+112
-83
lines changed

5 files changed

+112
-83
lines changed

analytic_engine/src/compaction/mod.rs

+72-36
Original file line numberDiff line numberDiff line change
@@ -318,13 +318,26 @@ pub struct ExpiredFiles {
318318

319319
#[derive(Default, Clone)]
320320
pub struct CompactionTask {
321-
pub compaction_inputs: Vec<CompactionInputFiles>,
322-
pub expired: Vec<ExpiredFiles>,
321+
inputs: Vec<CompactionInputFiles>,
322+
expired: Vec<ExpiredFiles>,
323+
}
324+
325+
impl Drop for CompactionTask {
326+
fn drop(&mut self) {
327+
// When a CompactionTask is dropped, it means
328+
// 1. the task finished successfully, or
329+
// 2. the task is cancelled for some reason, like memory limit
330+
//
331+
// In case 2, we need to mark files as not compacted in order for them to be
332+
// scheduled again. In case 1, the files will be moved out of level controller,
333+
// so it doesn't care what the flag is, so it's safe to set false here.
334+
self.mark_files_being_compacted(false);
335+
}
323336
}
324337

325338
impl CompactionTask {
326-
pub fn mark_files_being_compacted(&self, being_compacted: bool) {
327-
for input in &self.compaction_inputs {
339+
fn mark_files_being_compacted(&self, being_compacted: bool) {
340+
for input in &self.inputs {
328341
for file in &input.files {
329342
file.set_being_compacted(being_compacted);
330343
}
@@ -337,29 +350,76 @@ impl CompactionTask {
337350
}
338351

339352
// Estimate the size of the total input files.
353+
#[inline]
340354
pub fn estimated_total_input_file_size(&self) -> usize {
341355
let total_input_size: u64 = self
342-
.compaction_inputs
356+
.inputs
343357
.iter()
344358
.map(|v| v.files.iter().map(|f| f.size()).sum::<u64>())
345359
.sum();
346360

347361
total_input_size as usize
348362
}
349363

364+
#[inline]
350365
pub fn num_compact_files(&self) -> usize {
351-
self.compaction_inputs.iter().map(|v| v.files.len()).sum()
366+
self.inputs.iter().map(|v| v.files.len()).sum()
352367
}
353368

354-
pub fn num_expired_files(&self) -> usize {
355-
self.expired.iter().map(|v| v.files.len()).sum()
369+
#[inline]
370+
pub fn is_empty(&self) -> bool {
371+
self.is_input_empty() && self.expired.is_empty()
372+
}
373+
374+
#[inline]
375+
pub fn is_input_empty(&self) -> bool {
376+
self.inputs.is_empty()
377+
}
378+
379+
#[inline]
380+
pub fn expired(&self) -> &[ExpiredFiles] {
381+
&self.expired
382+
}
383+
384+
#[inline]
385+
pub fn inputs(&self) -> &[CompactionInputFiles] {
386+
&self.inputs
387+
}
388+
}
389+
390+
pub struct CompactionTaskBuilder {
391+
expired: Vec<ExpiredFiles>,
392+
inputs: Vec<CompactionInputFiles>,
393+
}
394+
395+
impl CompactionTaskBuilder {
396+
pub fn with_expired(expired: Vec<ExpiredFiles>) -> Self {
397+
Self {
398+
expired,
399+
inputs: Vec::new(),
400+
}
401+
}
402+
403+
pub fn add_inputs(&mut self, files: CompactionInputFiles) {
404+
self.inputs.push(files);
405+
}
406+
407+
pub fn build(self) -> CompactionTask {
408+
let task = CompactionTask {
409+
expired: self.expired,
410+
inputs: self.inputs,
411+
};
412+
413+
task.mark_files_being_compacted(true);
414+
415+
task
356416
}
357417
}
358418

359419
impl fmt::Debug for CompactionTask {
360420
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361421
f.debug_struct("CompactionTask")
362-
.field("inputs", &self.compaction_inputs)
422+
.field("inputs", &self.inputs)
363423
.field(
364424
"expired",
365425
&self
@@ -380,36 +440,12 @@ impl fmt::Debug for CompactionTask {
380440
}
381441
}
382442

383-
pub struct PickerManager {
384-
default_picker: CompactionPickerRef,
385-
time_window_picker: CompactionPickerRef,
386-
size_tiered_picker: CompactionPickerRef,
387-
}
388-
389-
impl Default for PickerManager {
390-
fn default() -> Self {
391-
let size_tiered_picker = Arc::new(CommonCompactionPicker::new(
392-
CompactionStrategy::SizeTiered(SizeTieredCompactionOptions::default()),
393-
));
394-
let time_window_picker = Arc::new(CommonCompactionPicker::new(
395-
CompactionStrategy::TimeWindow(TimeWindowCompactionOptions::default()),
396-
));
397-
398-
Self {
399-
default_picker: time_window_picker.clone(),
400-
size_tiered_picker,
401-
time_window_picker,
402-
}
403-
}
404-
}
443+
#[derive(Default)]
444+
pub struct PickerManager;
405445

406446
impl PickerManager {
407447
pub fn get_picker(&self, strategy: CompactionStrategy) -> CompactionPickerRef {
408-
match strategy {
409-
CompactionStrategy::Default => self.default_picker.clone(),
410-
CompactionStrategy::SizeTiered(_) => self.size_tiered_picker.clone(),
411-
CompactionStrategy::TimeWindow(_) => self.time_window_picker.clone(),
412-
}
448+
Arc::new(CommonCompactionPicker::new(strategy))
413449
}
414450
}
415451

analytic_engine/src/compaction/picker.rs

+31-33
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use snafu::Snafu;
1515

1616
use crate::{
1717
compaction::{
18-
CompactionInputFiles, CompactionStrategy, CompactionTask, SizeTieredCompactionOptions,
19-
TimeWindowCompactionOptions,
18+
CompactionInputFiles, CompactionStrategy, CompactionTask, CompactionTaskBuilder,
19+
SizeTieredCompactionOptions, TimeWindowCompactionOptions,
2020
},
2121
sst::{
2222
file::{FileHandle, Level},
@@ -60,7 +60,7 @@ pub trait CompactionPicker {
6060
fn pick_compaction(
6161
&self,
6262
ctx: PickerContext,
63-
levels_controller: &LevelsController,
63+
levels_controller: &mut LevelsController,
6464
) -> Result<CompactionTask>;
6565
}
6666

@@ -86,10 +86,10 @@ pub struct CommonCompactionPicker {
8686
impl CommonCompactionPicker {
8787
pub fn new(strategy: CompactionStrategy) -> Self {
8888
let level_picker: LevelPickerRef = match strategy {
89-
CompactionStrategy::SizeTiered(_) | CompactionStrategy::Default => {
90-
Arc::new(SizeTieredPicker::default())
89+
CompactionStrategy::SizeTiered(_) => Arc::new(SizeTieredPicker::default()),
90+
CompactionStrategy::TimeWindow(_) | CompactionStrategy::Default => {
91+
Arc::new(TimeWindowPicker::default())
9192
}
92-
CompactionStrategy::TimeWindow(_) => Arc::new(TimeWindowPicker::default()),
9393
};
9494
Self { level_picker }
9595
}
@@ -123,13 +123,11 @@ impl CompactionPicker for CommonCompactionPicker {
123123
fn pick_compaction(
124124
&self,
125125
ctx: PickerContext,
126-
levels_controller: &LevelsController,
126+
levels_controller: &mut LevelsController,
127127
) -> Result<CompactionTask> {
128128
let expire_time = ctx.ttl.map(Timestamp::expire_time);
129-
let mut compaction_task = CompactionTask {
130-
expired: levels_controller.expired_ssts(expire_time),
131-
..Default::default()
132-
};
129+
let mut builder =
130+
CompactionTaskBuilder::with_expired(levels_controller.expired_ssts(expire_time));
133131

134132
if let Some(input_files) =
135133
self.pick_compact_candidates(&ctx, levels_controller, expire_time)
@@ -139,10 +137,10 @@ impl CompactionPicker for CommonCompactionPicker {
139137
ctx.strategy, input_files
140138
);
141139

142-
compaction_task.compaction_inputs = vec![input_files];
140+
builder.add_inputs(input_files);
143141
}
144142

145-
Ok(compaction_task)
143+
Ok(builder.build())
146144
}
147145
}
148146

@@ -734,39 +732,39 @@ mod tests {
734732
};
735733
let now = Timestamp::now();
736734
{
737-
let lc = build_old_bucket_case(now.as_i64());
738-
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
739-
assert_eq!(task.compaction_inputs[0].files.len(), 2);
740-
assert_eq!(task.compaction_inputs[0].files[0].id(), 0);
741-
assert_eq!(task.compaction_inputs[0].files[1].id(), 1);
735+
let mut lc = build_old_bucket_case(now.as_i64());
736+
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
737+
assert_eq!(task.inputs[0].files.len(), 2);
738+
assert_eq!(task.inputs[0].files[0].id(), 0);
739+
assert_eq!(task.inputs[0].files[1].id(), 1);
742740
assert_eq!(task.expired[0].files.len(), 1);
743741
assert_eq!(task.expired[0].files[0].id(), 3);
744742
}
745743

746744
{
747-
let lc = build_newest_bucket_case(now.as_i64());
748-
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
749-
assert_eq!(task.compaction_inputs[0].files.len(), 4);
750-
assert_eq!(task.compaction_inputs[0].files[0].id(), 2);
751-
assert_eq!(task.compaction_inputs[0].files[1].id(), 3);
752-
assert_eq!(task.compaction_inputs[0].files[2].id(), 4);
753-
assert_eq!(task.compaction_inputs[0].files[3].id(), 5);
745+
let mut lc = build_newest_bucket_case(now.as_i64());
746+
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
747+
assert_eq!(task.inputs[0].files.len(), 4);
748+
assert_eq!(task.inputs[0].files[0].id(), 2);
749+
assert_eq!(task.inputs[0].files[1].id(), 3);
750+
assert_eq!(task.inputs[0].files[2].id(), 4);
751+
assert_eq!(task.inputs[0].files[3].id(), 5);
754752
}
755753

756754
{
757-
let lc = build_newest_bucket_no_match_case(now.as_i64());
758-
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
759-
assert_eq!(task.compaction_inputs.len(), 0);
755+
let mut lc = build_newest_bucket_no_match_case(now.as_i64());
756+
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
757+
assert_eq!(task.inputs.len(), 0);
760758
}
761759

762760
// If ttl is None, then no file is expired.
763761
ctx.ttl = None;
764762
{
765-
let lc = build_old_bucket_case(now.as_i64());
766-
let task = twp.pick_compaction(ctx, &lc).unwrap();
767-
assert_eq!(task.compaction_inputs[0].files.len(), 2);
768-
assert_eq!(task.compaction_inputs[0].files[0].id(), 0);
769-
assert_eq!(task.compaction_inputs[0].files[1].id(), 1);
763+
let mut lc = build_old_bucket_case(now.as_i64());
764+
let task = twp.pick_compaction(ctx, &mut lc).unwrap();
765+
assert_eq!(task.inputs[0].files.len(), 2);
766+
assert_eq!(task.inputs[0].files[0].id(), 0);
767+
assert_eq!(task.inputs[0].files[1].id(), 1);
770768
assert!(task.expired[0].files.is_empty());
771769
}
772770
}

analytic_engine/src/compaction/scheduler.rs

+2-8
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ impl OngoingTaskLimit {
237237

238238
if dropped > 0 {
239239
warn!(
240-
"Too many compaction pending tasks, limit: {}, dropped {} older tasks.",
240+
"Too many compaction pending tasks, limit:{}, dropped:{}.",
241241
self.max_pending_compaction_tasks, dropped,
242242
);
243243
}
@@ -462,10 +462,7 @@ impl ScheduleWorker {
462462
waiter_notifier: WaiterNotifier,
463463
token: MemoryUsageToken,
464464
) {
465-
// Mark files being in compaction.
466-
compaction_task.mark_files_being_compacted(true);
467-
468-
let keep_scheduling_compaction = !compaction_task.compaction_inputs.is_empty();
465+
let keep_scheduling_compaction = !compaction_task.is_input_empty();
469466

470467
let runtime = self.runtime.clone();
471468
let space_store = self.space_store.clone();
@@ -503,9 +500,6 @@ impl ScheduleWorker {
503500
.await;
504501

505502
if let Err(e) = &res {
506-
// Compaction is failed, we need to unset the compaction mark.
507-
compaction_task.mark_files_being_compacted(false);
508-
509503
error!(
510504
"Failed to compact table, table_name:{}, table_id:{}, request_id:{}, err:{}",
511505
table_data.name, table_data.id, request_id, e

analytic_engine/src/instance/flush_compaction.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -669,22 +669,23 @@ impl SpaceStore {
669669
"Begin compact table, table_name:{}, id:{}, task:{:?}",
670670
table_data.name, table_data.id, task
671671
);
672+
let inputs = task.inputs();
672673
let mut edit_meta = VersionEditMeta {
673674
space_id: table_data.space_id,
674675
table_id: table_data.id,
675676
flushed_sequence: 0,
676677
// Use the number of compaction inputs as the estimated number of files to add.
677-
files_to_add: Vec::with_capacity(task.compaction_inputs.len()),
678+
files_to_add: Vec::with_capacity(inputs.len()),
678679
files_to_delete: vec![],
679680
mems_to_remove: vec![],
680681
};
681682

682-
if task.num_expired_files() == 0 && task.num_compact_files() == 0 {
683+
if task.is_empty() {
683684
// Nothing to compact.
684685
return Ok(());
685686
}
686687

687-
for files in &task.expired {
688+
for files in task.expired() {
688689
self.delete_expired_files(table_data, request_id, files, &mut edit_meta);
689690
}
690691

@@ -696,7 +697,7 @@ impl SpaceStore {
696697
task.num_compact_files(),
697698
);
698699

699-
for input in &task.compaction_inputs {
700+
for input in inputs {
700701
self.compact_input_files(
701702
request_id,
702703
table_data,

analytic_engine/src/table/version.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -727,9 +727,9 @@ impl TableVersion {
727727
picker_ctx: PickerContext,
728728
picker: &CompactionPickerRef,
729729
) -> picker::Result<CompactionTask> {
730-
let inner = self.inner.read().unwrap();
730+
let mut inner = self.inner.write().unwrap();
731731

732-
picker.pick_compaction(picker_ctx, &inner.levels_controller)
732+
picker.pick_compaction(picker_ctx, &mut inner.levels_controller)
733733
}
734734

735735
pub fn has_expired_sst(&self, expire_time: Option<Timestamp>) -> bool {

0 commit comments

Comments
 (0)