From efa1bd50d84d061b112c9c303d32afe1fbffbe09 Mon Sep 17 00:00:00 2001 From: "chuanyun.lcy" Date: Wed, 24 Jul 2024 19:02:36 +0800 Subject: [PATCH] elastic quota ignore terminating pod Signed-off-by: chuanyun.lcy --- pkg/features/features.go | 3 + pkg/features/scheduler_features.go | 1 + .../elasticquota/core/group_quota_manager.go | 98 +++++++++++++++---- .../core/group_quota_manager_test.go | 84 ++++++++++++++++ 4 files changed, 165 insertions(+), 21 deletions(-) diff --git a/pkg/features/features.go b/pkg/features/features.go index ad03a2d22..d1d721840 100644 --- a/pkg/features/features.go +++ b/pkg/features/features.go @@ -57,6 +57,9 @@ const ( // ElasticQuotaIgnorePodOverhead ignore pod.spec.overhead when accounting pod requests ElasticQuotaIgnorePodOverhead featuregate.Feature = "ElasticQuotaIgnorePodOverhead" + // ElasticQuotaIgnoreTerminatingPod ignore the terminating pod. + ElasticQuotaIgnoreTerminatingPod featuregate.Feature = "ElasticQuotaIgnoreTerminatingPod" + // ElasticQuotaGuaranteeUsage enable guarantee the quota usage // In some specific scenarios, resources that have been allocated to users are considered // to belong to the users and will not be preempted back. diff --git a/pkg/features/scheduler_features.go b/pkg/features/scheduler_features.go index 10c05cae3..3084bc98f 100644 --- a/pkg/features/scheduler_features.go +++ b/pkg/features/scheduler_features.go @@ -73,6 +73,7 @@ var defaultSchedulerFeatureGates = map[featuregate.Feature]featuregate.FeatureSp ResizePod: {Default: false, PreRelease: featuregate.Alpha}, MultiQuotaTree: {Default: false, PreRelease: featuregate.Alpha}, ElasticQuotaIgnorePodOverhead: {Default: false, PreRelease: featuregate.Alpha}, + ElasticQuotaIgnoreTerminatingPod: {Default: false, PreRelease: featuregate.Alpha}, ElasticQuotaGuaranteeUsage: {Default: false, PreRelease: featuregate.Alpha}, DisableDefaultQuota: {Default: false, PreRelease: featuregate.Alpha}, SupportParentQuotaSubmitPod: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/pkg/scheduler/plugins/elasticquota/core/group_quota_manager.go b/pkg/scheduler/plugins/elasticquota/core/group_quota_manager.go index 0a7c69839..626ab7b03 100644 --- a/pkg/scheduler/plugins/elasticquota/core/group_quota_manager.go +++ b/pkg/scheduler/plugins/elasticquota/core/group_quota_manager.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "sync" + "time" v1 "k8s.io/api/core/v1" quotav1 "k8s.io/apiserver/pkg/quota/v1" @@ -741,6 +742,10 @@ func (gqm *GroupQuotaManager) GetQuotaSummaries(includePods bool) map[string]*Qu } func (gqm *GroupQuotaManager) OnPodAdd(quotaName string, pod *v1.Pod) { + if shouldBeIgnored(pod) { + return + } + gqm.hierarchyUpdateLock.RLock() defer gqm.hierarchyUpdateLock.RUnlock() @@ -763,32 +768,52 @@ func (gqm *GroupQuotaManager) OnPodUpdate(newQuotaName, oldQuotaName string, new defer gqm.hierarchyUpdateLock.RUnlock() if oldQuotaName == newQuotaName { - isAssigned := gqm.getPodIsAssignedNoLock(newQuotaName, newPod) - if isAssigned { - // reserve phase will assign the pod. Just update it. - // upgrade will change the resource. - gqm.updatePodUsedNoLock(newQuotaName, oldPod, newPod) + quotaInfo := gqm.getQuotaInfoByNameNoLock(newQuotaName) + if quotaInfo == nil { + return + } + + if !shouldBeIgnored(newPod) { + isAssigned := gqm.getPodIsAssignedNoLock(newQuotaName, newPod) + if isAssigned { + // reserve phase will assign the pod. Just update it. + // upgrade will change the resource. + gqm.updatePodUsedNoLock(newQuotaName, oldPod, newPod) + } else { + if newPod.Spec.NodeName != "" && !util.IsPodTerminated(newPod) { + // assign it + gqm.updatePodIsAssignedNoLock(newQuotaName, newPod, true) + gqm.updatePodUsedNoLock(newQuotaName, nil, newPod) + } + } + gqm.updatePodRequestNoLock(newQuotaName, oldPod, newPod) } else { - if newPod.Spec.NodeName != "" && !util.IsPodTerminated(newPod) { - // assign it - gqm.updatePodIsAssignedNoLock(newQuotaName, newPod, true) - gqm.updatePodUsedNoLock(newQuotaName, nil, newPod) + if quotaInfo.IsPodExist(oldPod) { + // remove the old resource. + gqm.updatePodRequestNoLock(oldQuotaName, oldPod, nil) + gqm.updatePodUsedNoLock(oldQuotaName, oldPod, nil) + gqm.updatePodCacheNoLock(oldQuotaName, oldPod, false) } } - gqm.updatePodRequestNoLock(newQuotaName, oldPod, newPod) } else { - isAssigned := gqm.getPodIsAssignedNoLock(oldQuotaName, oldPod) - if isAssigned { - gqm.updatePodUsedNoLock(oldQuotaName, oldPod, nil) + oldQuotaInfo := gqm.getQuotaInfoByNameNoLock(oldQuotaName) + if oldQuotaInfo != nil && oldQuotaInfo.IsPodExist(oldPod) { + isAssigned := gqm.getPodIsAssignedNoLock(oldQuotaName, oldPod) + if isAssigned { + gqm.updatePodUsedNoLock(oldQuotaName, oldPod, nil) + } + gqm.updatePodRequestNoLock(oldQuotaName, oldPod, nil) + gqm.updatePodCacheNoLock(oldQuotaName, oldPod, false) } - gqm.updatePodRequestNoLock(oldQuotaName, oldPod, nil) - gqm.updatePodCacheNoLock(oldQuotaName, oldPod, false) - - gqm.updatePodCacheNoLock(newQuotaName, newPod, true) - gqm.updatePodRequestNoLock(newQuotaName, nil, newPod) - if newPod.Spec.NodeName != "" && !util.IsPodTerminated(newPod) { - gqm.updatePodIsAssignedNoLock(newQuotaName, newPod, true) - gqm.updatePodUsedNoLock(newQuotaName, nil, newPod) + + newQuotaInfo := gqm.getQuotaInfoByNameNoLock(newQuotaName) + if newQuotaInfo != nil && !newQuotaInfo.IsPodExist(newPod) && !shouldBeIgnored(newPod) { + gqm.updatePodCacheNoLock(newQuotaName, newPod, true) + gqm.updatePodRequestNoLock(newQuotaName, nil, newPod) + if newPod.Spec.NodeName != "" && !util.IsPodTerminated(newPod) { + gqm.updatePodIsAssignedNoLock(newQuotaName, newPod, true) + gqm.updatePodUsedNoLock(newQuotaName, nil, newPod) + } } } } @@ -797,6 +822,11 @@ func (gqm *GroupQuotaManager) OnPodDelete(quotaName string, pod *v1.Pod) { gqm.hierarchyUpdateLock.RLock() defer gqm.hierarchyUpdateLock.RUnlock() + quotaInfo := gqm.getQuotaInfoByNameNoLock(quotaName) + if quotaInfo == nil || !quotaInfo.IsPodExist(pod) { + return + } + gqm.updatePodRequestNoLock(quotaName, pod, nil) gqm.updatePodUsedNoLock(quotaName, pod, nil) gqm.updatePodCacheNoLock(quotaName, pod, false) @@ -806,6 +836,11 @@ func (gqm *GroupQuotaManager) ReservePod(quotaName string, p *v1.Pod) { gqm.hierarchyUpdateLock.RLock() defer gqm.hierarchyUpdateLock.RUnlock() + quotaInfo := gqm.getQuotaInfoByNameNoLock(quotaName) + if quotaInfo == nil || !quotaInfo.IsPodExist(p) { + return + } + gqm.updatePodIsAssignedNoLock(quotaName, p, true) gqm.updatePodUsedNoLock(quotaName, nil, p) } @@ -814,6 +849,11 @@ func (gqm *GroupQuotaManager) UnreservePod(quotaName string, p *v1.Pod) { gqm.hierarchyUpdateLock.RLock() defer gqm.hierarchyUpdateLock.RUnlock() + quotaInfo := gqm.getQuotaInfoByNameNoLock(quotaName) + if quotaInfo == nil || !quotaInfo.IsPodExist(p) { + return + } + gqm.updatePodUsedNoLock(quotaName, p, nil) gqm.updatePodIsAssignedNoLock(quotaName, p, false) } @@ -1054,3 +1094,19 @@ func (gqm *GroupQuotaManager) doUpdateOneGroupSharedWeightNoLock(quotaName strin gqm.updateOneGroupSharedWeightNoLock(quotaInfo) } + +func shouldBeIgnored(pod *v1.Pod) bool { + if pod.DeletionTimestamp == nil { + return false + } + + if !utilfeature.DefaultFeatureGate.Enabled(features.ElasticQuotaIgnoreTerminatingPod) { + return false + } + + if pod.DeletionGracePeriodSeconds == nil { + return time.Now().After(pod.DeletionTimestamp.Time) + } else { + return time.Now().After(pod.DeletionTimestamp.Time.Add(time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second)) + } +} diff --git a/pkg/scheduler/plugins/elasticquota/core/group_quota_manager_test.go b/pkg/scheduler/plugins/elasticquota/core/group_quota_manager_test.go index 337d67933..bca933d6a 100644 --- a/pkg/scheduler/plugins/elasticquota/core/group_quota_manager_test.go +++ b/pkg/scheduler/plugins/elasticquota/core/group_quota_manager_test.go @@ -1509,6 +1509,90 @@ func TestGroupQuotaManager_OnPodUpdateAfterReserve(t *testing.T) { assert.Equal(t, createResourceList(10, 10), gqm.GetQuotaInfoByName("1").GetUsed()) } +func TestGroupQuotaManager_OnTerminatingPodUpdateAndDelete(t *testing.T) { + defer utilfeature.SetFeatureGateDuringTest(t, k8sfeature.DefaultFeatureGate, features.ElasticQuotaIgnoreTerminatingPod, true)() + gqm := NewGroupQuotaManagerForTest() + gqm.scaleMinQuotaEnabled = true + + gqm.UpdateClusterTotalResource(createResourceList(50, 50)) + + qi1 := createQuota("1", extension.RootQuotaName, 40, 40, 10, 10) + gqm.UpdateQuota(qi1, false) + + // unscheduler pod + pod1 := schetesting.MakePod().Name("1").Obj() + pod1.Spec.Containers = []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: createResourceList(10, 10), + }, + }, + } + gqm.OnPodAdd(qi1.Name, pod1) + assert.Equal(t, createResourceList(10, 10), gqm.GetQuotaInfoByName("1").GetRequest()) + assert.Equal(t, v1.ResourceList{}, gqm.GetQuotaInfoByName("1").GetUsed()) + + // scheduler the pod. + pod2 := pod1.DeepCopy() + pod2.Spec.NodeName = "node1" + gqm.OnPodUpdate("1", "1", pod2, pod1) + assert.Equal(t, createResourceList(10, 10), gqm.GetQuotaInfoByName("1").GetRequest()) + assert.Equal(t, createResourceList(10, 10), gqm.GetQuotaInfoByName("1").GetUsed()) + + // deleting the pod. + pod3 := pod2.DeepCopy() + deleteTimestamp := metav1.Time{Time: time.Now().Add(-10 * time.Second)} + deleteGracePeriodsSeconds := int64(30) + pod3.DeletionTimestamp = &deleteTimestamp + pod3.DeletionGracePeriodSeconds = &deleteGracePeriodsSeconds + gqm.OnPodUpdate("1", "1", pod3, pod2) + assert.Equal(t, createResourceList(10, 10), gqm.GetQuotaInfoByName("1").GetRequest()) + assert.Equal(t, createResourceList(10, 10), gqm.GetQuotaInfoByName("1").GetUsed()) + + // deleting the pod again. + pod4 := pod3.DeepCopy() + deleteTimestamp = metav1.Time{Time: time.Now().Add(-40 * time.Second)} + pod4.DeletionTimestamp = &deleteTimestamp + pod4.DeletionGracePeriodSeconds = &deleteGracePeriodsSeconds + gqm.OnPodUpdate("1", "1", pod4, pod3) + assert.Equal(t, createResourceList(0, 0), gqm.GetQuotaInfoByName("1").GetRequest()) + assert.Equal(t, createResourceList(0, 0), gqm.GetQuotaInfoByName("1").GetUsed()) + + // delete the pod. + gqm.OnPodDelete("1", pod4) + assert.Equal(t, createResourceList(0, 0), gqm.GetQuotaInfoByName("1").GetRequest()) + assert.Equal(t, createResourceList(0, 0), gqm.GetQuotaInfoByName("1").GetUsed()) +} + +func TestGroupQuotaManager_OnTerminatingPodAdd(t *testing.T) { + defer utilfeature.SetFeatureGateDuringTest(t, k8sfeature.DefaultFeatureGate, features.ElasticQuotaIgnoreTerminatingPod, true)() + gqm := NewGroupQuotaManagerForTest() + gqm.scaleMinQuotaEnabled = true + + gqm.UpdateClusterTotalResource(createResourceList(50, 50)) + + qi1 := createQuota("1", extension.RootQuotaName, 40, 40, 10, 10) + gqm.UpdateQuota(qi1, false) + + // add deleted pod + pod1 := schetesting.MakePod().Name("1").Obj() + pod1.Spec.Containers = []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: createResourceList(10, 10), + }, + }, + } + deleteTimestamp := metav1.Time{Time: time.Now().Add(-40 * time.Second)} + deleteGracePeriodsSeconds := int64(30) + pod1.Spec.NodeName = "node1" + pod1.DeletionTimestamp = &deleteTimestamp + pod1.DeletionGracePeriodSeconds = &deleteGracePeriodsSeconds + gqm.OnPodAdd(qi1.Name, pod1) + assert.Equal(t, v1.ResourceList{}, gqm.GetQuotaInfoByName("1").GetRequest()) + assert.Equal(t, v1.ResourceList{}, gqm.GetQuotaInfoByName("1").GetUsed()) +} + func TestNewGroupQuotaManager(t *testing.T) { gqm := NewGroupQuotaManager("", createResourceList(100, 100), createResourceList(300, 300)) assert.Equal(t, createResourceList(100, 100), gqm.GetQuotaInfoByName(extension.SystemQuotaName).GetMax())