Skip to content

Commit fd83f53

Browse files
authoredJun 26, 2023
revert: "fix: add page index for metadata (#1000)" (#1026)
This reverts commit 41fe63a. ## Rationale #1000 leads to some commits missing. ## Detailed Changes Revert #1000 ## Test Plan
1 parent 03d9aa4 commit fd83f53

File tree

25 files changed

+613
-444
lines changed

25 files changed

+613
-444
lines changed
 

‎Cargo.lock

-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎analytic_engine/src/compaction/mod.rs

+72-36
Original file line numberDiff line numberDiff line change
@@ -318,13 +318,26 @@ pub struct ExpiredFiles {
318318

319319
#[derive(Default, Clone)]
320320
pub struct CompactionTask {
321-
pub compaction_inputs: Vec<CompactionInputFiles>,
322-
pub expired: Vec<ExpiredFiles>,
321+
inputs: Vec<CompactionInputFiles>,
322+
expired: Vec<ExpiredFiles>,
323+
}
324+
325+
impl Drop for CompactionTask {
326+
fn drop(&mut self) {
327+
// When a CompactionTask is dropped, it means
328+
// 1. the task finished successfully, or
329+
// 2. the task is cancelled for some reason, like memory limit
330+
//
331+
// In case 2, we need to mark files as not compacted in order for them to be
332+
// scheduled again. In case 1, the files will be moved out of level controller,
333+
// so it doesn't care what the flag is, so it's safe to set false here.
334+
self.mark_files_being_compacted(false);
335+
}
323336
}
324337

325338
impl CompactionTask {
326-
pub fn mark_files_being_compacted(&self, being_compacted: bool) {
327-
for input in &self.compaction_inputs {
339+
fn mark_files_being_compacted(&self, being_compacted: bool) {
340+
for input in &self.inputs {
328341
for file in &input.files {
329342
file.set_being_compacted(being_compacted);
330343
}
@@ -337,29 +350,76 @@ impl CompactionTask {
337350
}
338351

339352
// Estimate the size of the total input files.
353+
#[inline]
340354
pub fn estimated_total_input_file_size(&self) -> usize {
341355
let total_input_size: u64 = self
342-
.compaction_inputs
356+
.inputs
343357
.iter()
344358
.map(|v| v.files.iter().map(|f| f.size()).sum::<u64>())
345359
.sum();
346360

347361
total_input_size as usize
348362
}
349363

364+
#[inline]
350365
pub fn num_compact_files(&self) -> usize {
351-
self.compaction_inputs.iter().map(|v| v.files.len()).sum()
366+
self.inputs.iter().map(|v| v.files.len()).sum()
352367
}
353368

354-
pub fn num_expired_files(&self) -> usize {
355-
self.expired.iter().map(|v| v.files.len()).sum()
369+
#[inline]
370+
pub fn is_empty(&self) -> bool {
371+
self.is_input_empty() && self.expired.is_empty()
372+
}
373+
374+
#[inline]
375+
pub fn is_input_empty(&self) -> bool {
376+
self.inputs.is_empty()
377+
}
378+
379+
#[inline]
380+
pub fn expired(&self) -> &[ExpiredFiles] {
381+
&self.expired
382+
}
383+
384+
#[inline]
385+
pub fn inputs(&self) -> &[CompactionInputFiles] {
386+
&self.inputs
387+
}
388+
}
389+
390+
pub struct CompactionTaskBuilder {
391+
expired: Vec<ExpiredFiles>,
392+
inputs: Vec<CompactionInputFiles>,
393+
}
394+
395+
impl CompactionTaskBuilder {
396+
pub fn with_expired(expired: Vec<ExpiredFiles>) -> Self {
397+
Self {
398+
expired,
399+
inputs: Vec::new(),
400+
}
401+
}
402+
403+
pub fn add_inputs(&mut self, files: CompactionInputFiles) {
404+
self.inputs.push(files);
405+
}
406+
407+
pub fn build(self) -> CompactionTask {
408+
let task = CompactionTask {
409+
expired: self.expired,
410+
inputs: self.inputs,
411+
};
412+
413+
task.mark_files_being_compacted(true);
414+
415+
task
356416
}
357417
}
358418

359419
impl fmt::Debug for CompactionTask {
360420
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361421
f.debug_struct("CompactionTask")
362-
.field("inputs", &self.compaction_inputs)
422+
.field("inputs", &self.inputs)
363423
.field(
364424
"expired",
365425
&self
@@ -380,36 +440,12 @@ impl fmt::Debug for CompactionTask {
380440
}
381441
}
382442

383-
pub struct PickerManager {
384-
default_picker: CompactionPickerRef,
385-
time_window_picker: CompactionPickerRef,
386-
size_tiered_picker: CompactionPickerRef,
387-
}
388-
389-
impl Default for PickerManager {
390-
fn default() -> Self {
391-
let size_tiered_picker = Arc::new(CommonCompactionPicker::new(
392-
CompactionStrategy::SizeTiered(SizeTieredCompactionOptions::default()),
393-
));
394-
let time_window_picker = Arc::new(CommonCompactionPicker::new(
395-
CompactionStrategy::TimeWindow(TimeWindowCompactionOptions::default()),
396-
));
397-
398-
Self {
399-
default_picker: time_window_picker.clone(),
400-
size_tiered_picker,
401-
time_window_picker,
402-
}
403-
}
404-
}
443+
#[derive(Default)]
444+
pub struct PickerManager;
405445

406446
impl PickerManager {
407447
pub fn get_picker(&self, strategy: CompactionStrategy) -> CompactionPickerRef {
408-
match strategy {
409-
CompactionStrategy::Default => self.default_picker.clone(),
410-
CompactionStrategy::SizeTiered(_) => self.size_tiered_picker.clone(),
411-
CompactionStrategy::TimeWindow(_) => self.time_window_picker.clone(),
412-
}
448+
Arc::new(CommonCompactionPicker::new(strategy))
413449
}
414450
}
415451

‎analytic_engine/src/compaction/picker.rs

+32-34
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
//! Compaction picker.
44
@@ -15,8 +15,8 @@ use snafu::Snafu;
1515

1616
use crate::{
1717
compaction::{
18-
CompactionInputFiles, CompactionStrategy, CompactionTask, SizeTieredCompactionOptions,
19-
TimeWindowCompactionOptions,
18+
CompactionInputFiles, CompactionStrategy, CompactionTask, CompactionTaskBuilder,
19+
SizeTieredCompactionOptions, TimeWindowCompactionOptions,
2020
},
2121
sst::{
2222
file::{FileHandle, Level},
@@ -60,7 +60,7 @@ pub trait CompactionPicker {
6060
fn pick_compaction(
6161
&self,
6262
ctx: PickerContext,
63-
levels_controller: &LevelsController,
63+
levels_controller: &mut LevelsController,
6464
) -> Result<CompactionTask>;
6565
}
6666

@@ -86,10 +86,10 @@ pub struct CommonCompactionPicker {
8686
impl CommonCompactionPicker {
8787
pub fn new(strategy: CompactionStrategy) -> Self {
8888
let level_picker: LevelPickerRef = match strategy {
89-
CompactionStrategy::SizeTiered(_) | CompactionStrategy::Default => {
90-
Arc::new(SizeTieredPicker::default())
89+
CompactionStrategy::SizeTiered(_) => Arc::new(SizeTieredPicker::default()),
90+
CompactionStrategy::TimeWindow(_) | CompactionStrategy::Default => {
91+
Arc::new(TimeWindowPicker::default())
9192
}
92-
CompactionStrategy::TimeWindow(_) => Arc::new(TimeWindowPicker::default()),
9393
};
9494
Self { level_picker }
9595
}
@@ -123,13 +123,11 @@ impl CompactionPicker for CommonCompactionPicker {
123123
fn pick_compaction(
124124
&self,
125125
ctx: PickerContext,
126-
levels_controller: &LevelsController,
126+
levels_controller: &mut LevelsController,
127127
) -> Result<CompactionTask> {
128128
let expire_time = ctx.ttl.map(Timestamp::expire_time);
129-
let mut compaction_task = CompactionTask {
130-
expired: levels_controller.expired_ssts(expire_time),
131-
..Default::default()
132-
};
129+
let mut builder =
130+
CompactionTaskBuilder::with_expired(levels_controller.expired_ssts(expire_time));
133131

134132
if let Some(input_files) =
135133
self.pick_compact_candidates(&ctx, levels_controller, expire_time)
@@ -139,10 +137,10 @@ impl CompactionPicker for CommonCompactionPicker {
139137
ctx.strategy, input_files
140138
);
141139

142-
compaction_task.compaction_inputs = vec![input_files];
140+
builder.add_inputs(input_files);
143141
}
144142

145-
Ok(compaction_task)
143+
Ok(builder.build())
146144
}
147145
}
148146

@@ -734,39 +732,39 @@ mod tests {
734732
};
735733
let now = Timestamp::now();
736734
{
737-
let lc = build_old_bucket_case(now.as_i64());
738-
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
739-
assert_eq!(task.compaction_inputs[0].files.len(), 2);
740-
assert_eq!(task.compaction_inputs[0].files[0].id(), 0);
741-
assert_eq!(task.compaction_inputs[0].files[1].id(), 1);
735+
let mut lc = build_old_bucket_case(now.as_i64());
736+
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
737+
assert_eq!(task.inputs[0].files.len(), 2);
738+
assert_eq!(task.inputs[0].files[0].id(), 0);
739+
assert_eq!(task.inputs[0].files[1].id(), 1);
742740
assert_eq!(task.expired[0].files.len(), 1);
743741
assert_eq!(task.expired[0].files[0].id(), 3);
744742
}
745743

746744
{
747-
let lc = build_newest_bucket_case(now.as_i64());
748-
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
749-
assert_eq!(task.compaction_inputs[0].files.len(), 4);
750-
assert_eq!(task.compaction_inputs[0].files[0].id(), 2);
751-
assert_eq!(task.compaction_inputs[0].files[1].id(), 3);
752-
assert_eq!(task.compaction_inputs[0].files[2].id(), 4);
753-
assert_eq!(task.compaction_inputs[0].files[3].id(), 5);
745+
let mut lc = build_newest_bucket_case(now.as_i64());
746+
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
747+
assert_eq!(task.inputs[0].files.len(), 4);
748+
assert_eq!(task.inputs[0].files[0].id(), 2);
749+
assert_eq!(task.inputs[0].files[1].id(), 3);
750+
assert_eq!(task.inputs[0].files[2].id(), 4);
751+
assert_eq!(task.inputs[0].files[3].id(), 5);
754752
}
755753

756754
{
757-
let lc = build_newest_bucket_no_match_case(now.as_i64());
758-
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
759-
assert_eq!(task.compaction_inputs.len(), 0);
755+
let mut lc = build_newest_bucket_no_match_case(now.as_i64());
756+
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
757+
assert_eq!(task.inputs.len(), 0);
760758
}
761759

762760
// If ttl is None, then no file is expired.
763761
ctx.ttl = None;
764762
{
765-
let lc = build_old_bucket_case(now.as_i64());
766-
let task = twp.pick_compaction(ctx, &lc).unwrap();
767-
assert_eq!(task.compaction_inputs[0].files.len(), 2);
768-
assert_eq!(task.compaction_inputs[0].files[0].id(), 0);
769-
assert_eq!(task.compaction_inputs[0].files[1].id(), 1);
763+
let mut lc = build_old_bucket_case(now.as_i64());
764+
let task = twp.pick_compaction(ctx, &mut lc).unwrap();
765+
assert_eq!(task.inputs[0].files.len(), 2);
766+
assert_eq!(task.inputs[0].files[0].id(), 0);
767+
assert_eq!(task.inputs[0].files[1].id(), 1);
770768
assert!(task.expired[0].files.is_empty());
771769
}
772770
}

‎analytic_engine/src/compaction/scheduler.rs

+12-9
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ impl OngoingTaskLimit {
237237

238238
if dropped > 0 {
239239
warn!(
240-
"Too many compaction pending tasks, limit: {}, dropped {} older tasks.",
240+
"Too many compaction pending tasks, limit:{}, dropped:{}.",
241241
self.max_pending_compaction_tasks, dropped,
242242
);
243243
}
@@ -462,10 +462,7 @@ impl ScheduleWorker {
462462
waiter_notifier: WaiterNotifier,
463463
token: MemoryUsageToken,
464464
) {
465-
// Mark files being in compaction.
466-
compaction_task.mark_files_being_compacted(true);
467-
468-
let keep_scheduling_compaction = !compaction_task.compaction_inputs.is_empty();
465+
let keep_scheduling_compaction = !compaction_task.is_input_empty();
469466

470467
let runtime = self.runtime.clone();
471468
let space_store = self.space_store.clone();
@@ -503,9 +500,6 @@ impl ScheduleWorker {
503500
.await;
504501

505502
if let Err(e) = &res {
506-
// Compaction is failed, we need to unset the compaction mark.
507-
compaction_task.mark_files_being_compacted(false);
508-
509503
error!(
510504
"Failed to compact table, table_name:{}, table_id:{}, request_id:{}, err:{}",
511505
table_data.name, table_data.id, request_id, e
@@ -656,7 +650,16 @@ impl ScheduleWorker {
656650
self.max_unflushed_duration,
657651
);
658652

659-
let mut serial_exec = table_data.serial_exec.lock().await;
653+
let mut serial_exec = if let Some(v) = table_data.acquire_serial_exec_ctx().await {
654+
v
655+
} else {
656+
warn!(
657+
"Table is closed, ignore this periodical flush, table:{}",
658+
table_data.name
659+
);
660+
continue;
661+
};
662+
660663
let flush_scheduler = serial_exec.flush_scheduler();
661664
// Instance flush the table asynchronously.
662665
if let Err(e) = flusher

‎analytic_engine/src/instance/close.rs

+10-6
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
//! Close table logic of instance
44
55
use log::{info, warn};
6-
use snafu::ResultExt;
6+
use snafu::{OptionExt, ResultExt};
77
use table_engine::engine::CloseTableRequest;
88

99
use crate::{
1010
instance::{
11-
engine::{DoManifestSnapshot, FlushTable, Result},
11+
engine::{DoManifestSnapshot, FlushTable, OperateClosedTable, Result},
1212
flush_compaction::{Flusher, TableFlushOptions},
1313
},
1414
manifest::{ManifestRef, SnapshotRequest},
@@ -37,8 +37,11 @@ impl Closer {
3737

3838
// Flush table.
3939
let opts = TableFlushOptions::default();
40-
let mut serial_exec = table_data.serial_exec.lock().await;
41-
let flush_scheduler = serial_exec.flush_scheduler();
40+
let mut serial_exec_ctx = table_data
41+
.acquire_serial_exec_ctx()
42+
.await
43+
.context(OperateClosedTable)?;
44+
let flush_scheduler = serial_exec_ctx.flush_scheduler();
4245

4346
self.flusher
4447
.do_flush(flush_scheduler, &table_data, opts)
@@ -67,9 +70,10 @@ impl Closer {
6770
let removed_table = self.space.remove_table(&request.table_name);
6871
assert!(removed_table.is_some());
6972

73+
serial_exec_ctx.invalidate();
7074
info!(
71-
"table:{}-{} has been removed from the space_id:{}",
72-
table_data.name, table_data.id, self.space.id
75+
"table:{} has been removed from the space_id:{}, table_id:{}",
76+
table_data.name, self.space.id, table_data.id,
7377
);
7478
Ok(())
7579
}

0 commit comments

Comments
 (0)