Skip to content

Commit

Permalink
store compaction group info in HummockVersion
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Feb 9, 2023
1 parent c8535a6 commit 4174134
Show file tree
Hide file tree
Showing 17 changed files with 1,235 additions and 1,485 deletions.
27 changes: 16 additions & 11 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ message GroupConstruct {
// If `parent_group_id` is not 0, it means `parent_group_id` splits into `parent_group_id` and this group, so this group is not empty initially.
uint64 parent_group_id = 2;
repeated uint32 table_ids = 3;
uint64 group_id = 4;
// TODO try to remove `table_id_to_options`
map<uint32, TableOption> table_id_to_options = 5;
}

message GroupMetaChange {
repeated uint32 table_ids_add = 1;
repeated uint32 table_ids_remove = 2;
map<uint32, TableOption> table_id_to_options_add = 3;
map<uint32, TableOption> table_id_to_options_remove = 4;
}

message GroupDestroy {}
Expand All @@ -67,6 +77,7 @@ message GroupDelta {
IntraLevelDelta intra_level = 1;
GroupConstruct group_construct = 2;
GroupDestroy group_destroy = 3;
GroupMetaChange group_meta_change = 4;
}
}

Expand All @@ -79,6 +90,11 @@ message HummockVersion {
message Levels {
repeated Level levels = 1;
OverlappingLevel l0 = 2;
uint64 group_id = 3;
uint64 parent_group_id = 4;
repeated uint32 member_table_ids = 5;
// TODO avoid store it here
map<uint32, TableOption> table_id_to_options = 6;
}
uint64 id = 1;
// Levels of each compaction group
Expand Down Expand Up @@ -264,10 +280,7 @@ message CompactStatus {

message CompactionGroup {
uint64 id = 1;
uint64 parent_id = 2;
repeated uint32 member_table_ids = 3;
CompactionConfig compaction_config = 4;
map<uint32, TableOption> table_id_to_options = 5;
}

message CompactTaskAssignment {
Expand Down Expand Up @@ -377,13 +390,6 @@ message ReportVacuumTaskResponse {
common.Status status = 1;
}

message GetCompactionGroupsRequest {}

message GetCompactionGroupsResponse {
common.Status status = 1;
repeated CompactionGroup compaction_groups = 2;
}

message TriggerManualCompactionRequest {
uint64 compaction_group_id = 1;
KeyRange key_range = 2;
Expand Down Expand Up @@ -534,7 +540,6 @@ service HummockManagerService {
rpc GetNewSstIds(GetNewSstIdsRequest) returns (GetNewSstIdsResponse);
rpc SubscribeCompactTasks(SubscribeCompactTasksRequest) returns (stream SubscribeCompactTasksResponse);
rpc ReportVacuumTask(ReportVacuumTaskRequest) returns (ReportVacuumTaskResponse);
rpc GetCompactionGroups(GetCompactionGroupsRequest) returns (GetCompactionGroupsResponse);
rpc TriggerManualCompaction(TriggerManualCompactionRequest) returns (TriggerManualCompactionResponse);
rpc ReportFullScanTask(ReportFullScanTaskRequest) returns (ReportFullScanTaskResponse);
rpc TriggerFullGC(TriggerFullGCRequest) returns (TriggerFullGCResponse);
Expand Down
13 changes: 5 additions & 8 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,14 @@ where

// unregister compaction group for dirty table fragments.
let _ = self.hummock_manager
.unregister_table_ids(
&to_drop_streaming_ids
.iter()
.map(|t| t.table_id)
.collect_vec(),
.unregister_table_fragments_vec(
&to_drop_table_fragments
)
.await.inspect_err(|e|
tracing::warn!(
"Failed to unregister compaction group for {:#?}.\nThey will be cleaned up on node restart.\n{:#?}",
to_drop_streaming_ids,
e)
"Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}",
to_drop_table_fragments,
e)
);

// clean up source connector dirty changes.
Expand Down
32 changes: 0 additions & 32 deletions src/meta/src/hummock/compaction_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,56 +26,35 @@ use crate::model::{MetadataModel, MetadataModelResult};
#[derive(Debug, Clone, PartialEq)]
pub struct CompactionGroup {
pub(crate) group_id: CompactionGroupId,
pub(crate) parent_group_id: CompactionGroupId,
pub(crate) member_table_ids: HashSet<StateTableId>,
pub(crate) compaction_config: CompactionConfig,
pub(crate) table_id_to_options: HashMap<StateTableId, TableOption>,
}

impl CompactionGroup {
pub fn new(group_id: CompactionGroupId, compaction_config: CompactionConfig) -> Self {
Self {
group_id,
member_table_ids: Default::default(),
compaction_config,
table_id_to_options: HashMap::default(),
parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
}
}

pub fn group_id(&self) -> CompactionGroupId {
self.group_id
}

pub fn member_table_ids(&self) -> &HashSet<StateTableId> {
&self.member_table_ids
}

pub fn compaction_config(&self) -> CompactionConfig {
self.compaction_config.clone()
}

pub fn table_id_to_options(&self) -> &HashMap<u32, TableOption> {
&self.table_id_to_options
}
}

impl From<&risingwave_pb::hummock::CompactionGroup> for CompactionGroup {
fn from(compaction_group: &risingwave_pb::hummock::CompactionGroup) -> Self {
Self {
group_id: compaction_group.id,
parent_group_id: compaction_group.parent_id,
member_table_ids: compaction_group.member_table_ids.iter().cloned().collect(),
compaction_config: compaction_group
.compaction_config
.as_ref()
.cloned()
.unwrap(),
table_id_to_options: compaction_group
.table_id_to_options
.iter()
.map(|id_to_table_option| (*id_to_table_option.0, id_to_table_option.1.into()))
.collect::<HashMap<_, _>>(),
}
}
}
Expand All @@ -84,18 +63,7 @@ impl From<&CompactionGroup> for risingwave_pb::hummock::CompactionGroup {
fn from(compaction_group: &CompactionGroup) -> Self {
Self {
id: compaction_group.group_id,
parent_id: compaction_group.parent_group_id,
member_table_ids: compaction_group
.member_table_ids
.iter()
.cloned()
.collect_vec(),
compaction_config: Some(compaction_group.compaction_config.clone()),
table_id_to_options: compaction_group
.table_id_to_options
.iter()
.map(|id_to_table_option| (*id_to_table_option.0, id_to_table_option.1.into()))
.collect::<HashMap<_, _>>(),
}
}
}
Expand Down
10 changes: 0 additions & 10 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,6 @@ where
.count() as u64
}

pub async fn get_compaction_config(
&self,
compaction_group_id: CompactionGroupId,
) -> CompactionConfig {
self.compaction_group(compaction_group_id)
.await
.expect("compaction group exists")
.compaction_config()
}

#[named]
pub async fn list_all_tasks_ids(&self) -> Vec<HummockCompactionTaskId> {
let compaction = read_lock!(self, compaction).await;
Expand Down
Loading

0 comments on commit 4174134

Please sign in to comment.