Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Add force trigger #30641

Merged
merged 1 commit into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 50 additions & 46 deletions internal/datacoord/compaction_l0_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,8 @@ func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) {
return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp()
})

var (
minDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMinSize.GetAsFloat()
maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat()
minDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.GetAsInt()
maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt()
)

targetViews, targetSize := v.filterViewsBySizeRange(validSegments, minDeltaSize, maxDeltaSize)
if targetViews != nil {
reason := fmt.Sprintf("level zero segments size reaches compaction limit, curDeltaSize=%.2f, limitSizeRange=[%.2f, %.2f]",
targetSize, minDeltaSize, maxDeltaSize)
return &LevelZeroSegmentsView{
label: v.label,
segments: targetViews,
earliestGrowingSegmentPos: v.earliestGrowingSegmentPos,
}, reason
}

targetViews, targetCount := v.filterViewsByCountRange(validSegments, minDeltaCount, maxDeltaCount)
if targetViews != nil {
reason := fmt.Sprintf("level zero segments count reaches compaction limit, curDeltaCount=%d, limitCountRange=[%d, %d]",
targetCount, minDeltaCount, maxDeltaCount)
targetViews, reason := v.minCountSizeTrigger(validSegments)
if len(targetViews) > 0 {
return &LevelZeroSegmentsView{
label: v.label,
segments: targetViews,
Expand All @@ -106,44 +86,68 @@ func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) {
return nil, ""
}

// filterViewByCountRange picks segment views that total sizes in range [minCount, maxCount]
func (v *LevelZeroSegmentsView) filterViewsByCountRange(segments []*SegmentView, minCount, maxCount int) ([]*SegmentView, int) {
curDeltaCount := 0
idx := 0
for _, view := range segments {
targetCount := view.DeltalogCount + curDeltaCount
if idx != 0 && targetCount > maxCount {
break
}
// minCountSizeTrigger tries to trigger LevelZeroCompaction when segmentViews reaches minimum trigger conditions:
// 1. count >= minDeltaCount, OR
// 2. size >= minDeltaSize
func (v *LevelZeroSegmentsView) minCountSizeTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) {
var (
minDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMinSize.GetAsFloat()
maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat()
minDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.GetAsInt()
maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt()
)

idx += 1
curDeltaCount = targetCount
curSize := float64(0)

// count >= minDeltaCount
if lo.SumBy(segments, func(view *SegmentView) int { return view.DeltalogCount }) >= minDeltaCount {
picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount)
reason = fmt.Sprintf("level zero segments count reaches minForceTriggerCountLimit=%d, curDeltaSize=%.2f, curDeltaCount=%d", minDeltaCount, curSize, len(segments))
return
}

if curDeltaCount < minCount {
return nil, 0
// size >= minDeltaSize
if lo.SumBy(segments, func(view *SegmentView) float64 { return view.DeltaSize }) >= minDeltaSize {
picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount)
reason = fmt.Sprintf("level zero segments size reaches minForceTriggerSizeLimit=%.2f, curDeltaSize=%.2f, curDeltaCount=%d", minDeltaSize, curSize, len(segments))
return
}

return segments[:idx], curDeltaCount
return
}

// filterViewBySizeRange picks segment views that total count in range [minSize, maxSize]
func (v *LevelZeroSegmentsView) filterViewsBySizeRange(segments []*SegmentView, minSize, maxSize float64) ([]*SegmentView, float64) {
var curDeltaSize float64
// forceTrigger tries to trigger LevelZeroCompaction even when segmentsViews don't meet the minimum condition,
// the picked plan is still satisfied with the maximum condition
func (v *LevelZeroSegmentsView) forceTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) {
var (
maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat()
maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt()
)

curSize := float64(0)
picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount)
reason = fmt.Sprintf("level zero views force to trigger, curDeltaSize=%.2f, curDeltaCount=%d", curSize, len(segments))
return
}

// pickByMaxCountSize picks segments that count <= maxCount or size <= maxSize
func pickByMaxCountSize(segments []*SegmentView, maxSize float64, maxCount int) ([]*SegmentView, float64) {
var (
curDeltaCount = 0
curDeltaSize = float64(0)
)
idx := 0
for _, view := range segments {
targetCount := view.DeltalogCount + curDeltaCount
targetSize := view.DeltaSize + curDeltaSize
if idx != 0 && targetSize > maxSize {

if (curDeltaCount != 0 && curDeltaSize != float64(0)) && (targetSize > maxSize || targetCount > maxCount) {
break
}

idx += 1
curDeltaCount = targetCount
curDeltaSize = targetSize
idx += 1
}

if curDeltaSize < minSize {
return nil, 0
}

return segments[:idx], curDeltaSize
}
83 changes: 83 additions & 0 deletions internal/datacoord/compaction_l0_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,86 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() {
})
}
}

func (s *LevelZeroSegmentsViewSuite) TestMinCountSizeTrigger() {
label := s.v.GetGroupLabel()
tests := []struct {
description string
segIDs []int64
segCounts []int
segSize []float64

expectedIDs []int64
}{
{"donot trigger", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{1, 1, 1}, nil},
{"trigger by count=15", []int64{100, 101, 102}, []int{5, 5, 5}, []float64{1, 1, 1}, []int64{100, 101, 102}},
{"trigger by count=10", []int64{100, 101, 102}, []int{5, 3, 2}, []float64{1, 1, 1}, []int64{100, 101, 102}},
{"trigger by count=50", []int64{100, 101, 102}, []int{32, 10, 8}, []float64{1, 1, 1}, []int64{100}},
{"trigger by size=24MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{8 * 1024 * 1024, 8 * 1024 * 1024, 8 * 1024 * 1024}, []int64{100, 101, 102}},
{"trigger by size=8MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{3 * 1024 * 1024, 3 * 1024 * 1024, 2 * 1024 * 1024}, []int64{100, 101, 102}},
{"trigger by size=128MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{100 * 1024 * 1024, 20 * 1024 * 1024, 8 * 1024 * 1024}, []int64{100}},
}

for _, test := range tests {
s.Run(test.description, func() {
views := []*SegmentView{}
for idx, ID := range test.segIDs {
seg := genTestL0SegmentView(ID, label, 10000)
seg.DeltaSize = test.segSize[idx]
seg.DeltalogCount = test.segCounts[idx]

views = append(views, seg)
}

picked, reason := s.v.minCountSizeTrigger(views)
s.ElementsMatch(lo.Map(picked, func(view *SegmentView, _ int) int64 {
return view.ID
}), test.expectedIDs)

if len(picked) > 0 {
s.NotEmpty(reason)
}

log.Info("test minCountSizeTrigger", zap.Any("trigger reason", reason))
})
}
}

func (s *LevelZeroSegmentsViewSuite) TestForceTrigger() {
label := s.v.GetGroupLabel()
tests := []struct {
description string
segIDs []int64
segCounts []int
segSize []float64

expectedIDs []int64
}{
{"force trigger", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{1, 1, 1}, []int64{100, 101, 102}},
{"trigger by count=15", []int64{100, 101, 102}, []int{5, 5, 5}, []float64{1, 1, 1}, []int64{100, 101, 102}},
{"trigger by count=10", []int64{100, 101, 102}, []int{5, 3, 2}, []float64{1, 1, 1}, []int64{100, 101, 102}},
{"trigger by count=50", []int64{100, 101, 102}, []int{32, 10, 8}, []float64{1, 1, 1}, []int64{100}},
{"trigger by size=24MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{8 * 1024 * 1024, 8 * 1024 * 1024, 8 * 1024 * 1024}, []int64{100, 101, 102}},
{"trigger by size=8MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{3 * 1024 * 1024, 3 * 1024 * 1024, 2 * 1024 * 1024}, []int64{100, 101, 102}},
{"trigger by size=128MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{100 * 1024 * 1024, 20 * 1024 * 1024, 8 * 1024 * 1024}, []int64{100}},
}

for _, test := range tests {
s.Run(test.description, func() {
views := []*SegmentView{}
for idx, ID := range test.segIDs {
seg := genTestL0SegmentView(ID, label, 10000)
seg.DeltaSize = test.segSize[idx]
seg.DeltalogCount = test.segCounts[idx]

views = append(views, seg)
}

picked, reason := s.v.forceTrigger(views)
s.ElementsMatch(lo.Map(picked, func(view *SegmentView, _ int) int64 {
return view.ID
}), test.expectedIDs)
log.Info("test forceTrigger", zap.Any("trigger reason", reason))
})
}
}
4 changes: 2 additions & 2 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2731,8 +2731,8 @@ During compaction, the size of segment # of rows is able to exceed segment max #
p.LevelZeroCompactionTriggerDeltalogMaxNum = ParamItem{
Key: "dataCoord.compaction.levelzero.forceTrigger.deltalogMaxNum",
Version: "2.4.0",
Doc: "The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 20",
DefaultValue: "20",
Doc: "The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30",
DefaultValue: "30",
}
p.LevelZeroCompactionTriggerDeltalogMaxNum.Init(base.mgr)

Expand Down
Loading