diff --git a/internal/datacoord/compaction_l0_view.go b/internal/datacoord/compaction_l0_view.go index 7651869b5e293..71326586d1aa5 100644 --- a/internal/datacoord/compaction_l0_view.go +++ b/internal/datacoord/compaction_l0_view.go @@ -74,28 +74,8 @@ func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) { return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp() }) - var ( - minDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMinSize.GetAsFloat() - maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat() - minDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.GetAsInt() - maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt() - ) - - targetViews, targetSize := v.filterViewsBySizeRange(validSegments, minDeltaSize, maxDeltaSize) - if targetViews != nil { - reason := fmt.Sprintf("level zero segments size reaches compaction limit, curDeltaSize=%.2f, limitSizeRange=[%.2f, %.2f]", - targetSize, minDeltaSize, maxDeltaSize) - return &LevelZeroSegmentsView{ - label: v.label, - segments: targetViews, - earliestGrowingSegmentPos: v.earliestGrowingSegmentPos, - }, reason - } - - targetViews, targetCount := v.filterViewsByCountRange(validSegments, minDeltaCount, maxDeltaCount) - if targetViews != nil { - reason := fmt.Sprintf("level zero segments count reaches compaction limit, curDeltaCount=%d, limitCountRange=[%d, %d]", - targetCount, minDeltaCount, maxDeltaCount) + targetViews, reason := v.minCountSizeTrigger(validSegments) + if len(targetViews) > 0 { return &LevelZeroSegmentsView{ label: v.label, segments: targetViews, @@ -106,44 +86,68 @@ func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) { return nil, "" } -// filterViewByCountRange picks segment views that total sizes in range [minCount, maxCount] -func (v *LevelZeroSegmentsView) filterViewsByCountRange(segments []*SegmentView, minCount, maxCount int) ([]*SegmentView, int) { - curDeltaCount := 0 - idx := 0 - for _, view := range segments { - targetCount := view.DeltalogCount + curDeltaCount - if idx != 0 && targetCount > maxCount { - break - } +// minCountSizeTrigger tries to trigger LevelZeroCompaction when segmentViews reaches minimum trigger conditions: +// 1. count >= minDeltaCount, OR +// 2. size >= minDeltaSize +func (v *LevelZeroSegmentsView) minCountSizeTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) { + var ( + minDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMinSize.GetAsFloat() + maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat() + minDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.GetAsInt() + maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt() + ) - idx += 1 - curDeltaCount = targetCount + curSize := float64(0) + + // count >= minDeltaCount + if lo.SumBy(segments, func(view *SegmentView) int { return view.DeltalogCount }) >= minDeltaCount { + picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount) + reason = fmt.Sprintf("level zero segments count reaches minForceTriggerCountLimit=%d, curDeltaSize=%.2f, curDeltaCount=%d", minDeltaCount, curSize, len(segments)) + return } - if curDeltaCount < minCount { - return nil, 0 + // size >= minDeltaSize + if lo.SumBy(segments, func(view *SegmentView) float64 { return view.DeltaSize }) >= minDeltaSize { + picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount) + reason = fmt.Sprintf("level zero segments size reaches minForceTriggerSizeLimit=%.2f, curDeltaSize=%.2f, curDeltaCount=%d", minDeltaSize, curSize, len(segments)) + return } - return segments[:idx], curDeltaCount + return } -// filterViewBySizeRange picks segment views that total count in range [minSize, maxSize] -func (v *LevelZeroSegmentsView) filterViewsBySizeRange(segments []*SegmentView, minSize, maxSize float64) ([]*SegmentView, float64) { - var curDeltaSize float64 +// forceTrigger tries to trigger LevelZeroCompaction even when segmentsViews don't meet the minimum condition, +// the picked plan is still satisfied with the maximum condition +func (v *LevelZeroSegmentsView) forceTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) { + var ( + maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat() + maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt() + ) + + curSize := float64(0) + picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount) + reason = fmt.Sprintf("level zero views force to trigger, curDeltaSize=%.2f, curDeltaCount=%d", curSize, len(segments)) + return +} + +// pickByMaxCountSize picks segments that count <= maxCount or size <= maxSize +func pickByMaxCountSize(segments []*SegmentView, maxSize float64, maxCount int) ([]*SegmentView, float64) { + var ( + curDeltaCount = 0 + curDeltaSize = float64(0) + ) idx := 0 for _, view := range segments { + targetCount := view.DeltalogCount + curDeltaCount targetSize := view.DeltaSize + curDeltaSize - if idx != 0 && targetSize > maxSize { + + if (curDeltaCount != 0 && curDeltaSize != float64(0)) && (targetSize > maxSize || targetCount > maxCount) { break } - idx += 1 + curDeltaCount = targetCount curDeltaSize = targetSize + idx += 1 } - - if curDeltaSize < minSize { - return nil, 0 - } - return segments[:idx], curDeltaSize } diff --git a/internal/datacoord/compaction_l0_view_test.go b/internal/datacoord/compaction_l0_view_test.go index d39f46c3cbe6e..863dbbe5678be 100644 --- a/internal/datacoord/compaction_l0_view_test.go +++ b/internal/datacoord/compaction_l0_view_test.go @@ -171,3 +171,86 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { }) } } + +func (s *LevelZeroSegmentsViewSuite) TestMinCountSizeTrigger() { + label := s.v.GetGroupLabel() + tests := []struct { + description string + segIDs []int64 + segCounts []int + segSize []float64 + + expectedIDs []int64 + }{ + {"donot trigger", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{1, 1, 1}, nil}, + {"trigger by count=15", []int64{100, 101, 102}, []int{5, 5, 5}, []float64{1, 1, 1}, []int64{100, 101, 102}}, + {"trigger by count=10", []int64{100, 101, 102}, []int{5, 3, 2}, []float64{1, 1, 1}, []int64{100, 101, 102}}, + {"trigger by count=50", []int64{100, 101, 102}, []int{32, 10, 8}, []float64{1, 1, 1}, []int64{100}}, + {"trigger by size=24MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{8 * 1024 * 1024, 8 * 1024 * 1024, 8 * 1024 * 1024}, []int64{100, 101, 102}}, + {"trigger by size=8MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{3 * 1024 * 1024, 3 * 1024 * 1024, 2 * 1024 * 1024}, []int64{100, 101, 102}}, + {"trigger by size=128MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{100 * 1024 * 1024, 20 * 1024 * 1024, 8 * 1024 * 1024}, []int64{100}}, + } + + for _, test := range tests { + s.Run(test.description, func() { + views := []*SegmentView{} + for idx, ID := range test.segIDs { + seg := genTestL0SegmentView(ID, label, 10000) + seg.DeltaSize = test.segSize[idx] + seg.DeltalogCount = test.segCounts[idx] + + views = append(views, seg) + } + + picked, reason := s.v.minCountSizeTrigger(views) + s.ElementsMatch(lo.Map(picked, func(view *SegmentView, _ int) int64 { + return view.ID + }), test.expectedIDs) + + if len(picked) > 0 { + s.NotEmpty(reason) + } + + log.Info("test minCountSizeTrigger", zap.Any("trigger reason", reason)) + }) + } +} + +func (s *LevelZeroSegmentsViewSuite) TestForceTrigger() { + label := s.v.GetGroupLabel() + tests := []struct { + description string + segIDs []int64 + segCounts []int + segSize []float64 + + expectedIDs []int64 + }{ + {"force trigger", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{1, 1, 1}, []int64{100, 101, 102}}, + {"trigger by count=15", []int64{100, 101, 102}, []int{5, 5, 5}, []float64{1, 1, 1}, []int64{100, 101, 102}}, + {"trigger by count=10", []int64{100, 101, 102}, []int{5, 3, 2}, []float64{1, 1, 1}, []int64{100, 101, 102}}, + {"trigger by count=50", []int64{100, 101, 102}, []int{32, 10, 8}, []float64{1, 1, 1}, []int64{100}}, + {"trigger by size=24MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{8 * 1024 * 1024, 8 * 1024 * 1024, 8 * 1024 * 1024}, []int64{100, 101, 102}}, + {"trigger by size=8MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{3 * 1024 * 1024, 3 * 1024 * 1024, 2 * 1024 * 1024}, []int64{100, 101, 102}}, + {"trigger by size=128MB", []int64{100, 101, 102}, []int{1, 1, 1}, []float64{100 * 1024 * 1024, 20 * 1024 * 1024, 8 * 1024 * 1024}, []int64{100}}, + } + + for _, test := range tests { + s.Run(test.description, func() { + views := []*SegmentView{} + for idx, ID := range test.segIDs { + seg := genTestL0SegmentView(ID, label, 10000) + seg.DeltaSize = test.segSize[idx] + seg.DeltalogCount = test.segCounts[idx] + + views = append(views, seg) + } + + picked, reason := s.v.forceTrigger(views) + s.ElementsMatch(lo.Map(picked, func(view *SegmentView, _ int) int64 { + return view.ID + }), test.expectedIDs) + log.Info("test forceTrigger", zap.Any("trigger reason", reason)) + }) + } +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 24310b4f35e11..f752966719d98 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2731,8 +2731,8 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.LevelZeroCompactionTriggerDeltalogMaxNum = ParamItem{ Key: "dataCoord.compaction.levelzero.forceTrigger.deltalogMaxNum", Version: "2.4.0", - Doc: "The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 20", - DefaultValue: "20", + Doc: "The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30", + DefaultValue: "30", } p.LevelZeroCompactionTriggerDeltalogMaxNum.Init(base.mgr)