From fb920cc8657e236663c722716d5b71eb7e0d15da Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 5 Jul 2023 03:15:09 +0000 Subject: [PATCH] Update Kubernetes to latest master --- .../pkg/util/flowcontrol/apf_controller.go | 178 ++++++++---------- .../util/flowcontrol/apf_controller_debug.go | 34 ---- .../util/flowcontrol/fairqueuing/interface.go | 12 +- .../fairqueuing/queueset/queueset.go | 9 +- 4 files changed, 99 insertions(+), 134 deletions(-) diff --git a/vendor/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/vendor/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 2048a6ef6..3c36bb604 100644 --- a/vendor/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/vendor/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -197,16 +197,15 @@ type priorityLevelState struct { pl *flowcontrol.PriorityLevelConfiguration // qsCompleter holds the QueueSetCompleter derived from `config` - // and `queues` if config is not exempt, nil otherwise. + // and `queues`. qsCompleter fq.QueueSetCompleter - // The QueueSet for this priority level. This is nil if and only - // if the priority level is exempt. + // The QueueSet for this priority level. + // Never nil. queues fq.QueueSet // quiescing==true indicates that this priority level should be - // removed when its queues have all drained. May be true only if - // queues is non-nil. + // removed when its queues have all drained. quiescing bool // number of goroutines between Controller::Match and calling the @@ -384,9 +383,6 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta items := make([]allocProblemItem, 0, len(plStates)) plNames := make([]string, 0, len(plStates)) for plName, plState := range plStates { - if plState.pl.Spec.Limited == nil { - continue - } obs := plState.seatDemandIntegrator.Reset() plState.seatDemandStats.update(obs) // Lower bound on this priority level's adjusted concurreny limit is the lesser of: @@ -403,7 +399,7 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta }) } if len(items) == 0 && cfgCtlr.nominalCLSum > 0 { - klog.ErrorS(nil, "Impossible: no non-exempt priority levels", "plStates", cfgCtlr.priorityLevelStates) + klog.ErrorS(nil, "Impossible: no priority levels", "plStates", cfgCtlr.priorityLevelStates) return } allocs, fairFrac, err := computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items) @@ -412,17 +408,11 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta allocs = make([]float64, len(items)) for idx, plName := range plNames { plState := plStates[plName] - if plState.pl.Spec.Limited == nil { - continue - } allocs[idx] = float64(plState.currentCL) } } for idx, plName := range plNames { plState := plStates[plName] - if plState.pl.Spec.Limited == nil { - continue - } if setCompleters { qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues, plState.pl, cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, @@ -441,8 +431,15 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta if relChange >= 0.05 { logLevel = 2 } - klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "backstop", err != nil) - plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL}) + var concurrencyDenominator int + if currentCL > 0 { + concurrencyDenominator = currentCL + } else { + concurrencyDenominator = int(math.Max(1, math.Round(float64(cfgCtlr.serverConcurrencyLimit)/10))) + } + plState.seatDemandRatioedGauge.SetDenominator(float64(concurrencyDenominator)) + klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "concurrencyDenominator", concurrencyDenominator, "backstop", err != nil) + plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL, ConcurrencyDenominator: concurrencyDenominator}) } metrics.SetFairFrac(float64(fairFrac)) } @@ -690,9 +687,8 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi klog.V(3).Infof("Priority level %q was undesired and has become desired again", pl.Name) state.quiescing = false } - if state.pl.Spec.Limited != nil { - meal.shareSum += float64(state.pl.Spec.Limited.NominalConcurrencyShares) - } + nominalConcurrencyShares, _, _ := plSpecCommons(state.pl) + meal.shareSum += float64(nominalConcurrencyShares) meal.haveExemptPL = meal.haveExemptPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameCatchAll } @@ -765,15 +761,15 @@ func (meal *cfgMeal) processOldPLsLocked() { continue } if plName == flowcontrol.PriorityLevelConfigurationNameExempt && !meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll && !meal.haveCatchAllPL { - // BTW, we know the Spec has not changed because the - // mandatory objects have immutable Specs + // BTW, we know the Spec has not changed what is says about queuing because the + // mandatory objects have immutable Specs as far as queuing is concerned. klog.V(3).Infof("Retaining mandatory priority level %q despite lack of API object", plName) } else { - if plState.queues == nil || plState.numPending == 0 && plState.queues.IsIdle() { - // Either there are no queues or they are done + if plState.numPending == 0 && plState.queues.IsIdle() { + // The QueueSet is done // draining and no use is coming from another // goroutine - klog.V(3).Infof("Removing undesired priority level %q (nilQueues=%v), Type=%v", plName, plState.queues == nil, plState.pl.Spec.Type) + klog.V(3).Infof("Removing undesired priority level %q, Type=%v", plName, plState.pl.Spec.Type) continue } if !plState.quiescing { @@ -789,15 +785,14 @@ func (meal *cfgMeal) processOldPLsLocked() { // This can not happen because queueSetCompleterForPL already approved this config panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) } - if plState.pl.Spec.Limited != nil { - // We deliberately include the lingering priority levels - // here so that their queues get some concurrency and they - // continue to drain. During this interim a lingering - // priority level continues to get a concurrency - // allocation determined by all the share values in the - // regular way. - meal.shareSum += float64(plState.pl.Spec.Limited.NominalConcurrencyShares) - } + // We deliberately include the lingering priority levels + // here so that their queues get some concurrency and they + // continue to drain. During this interim a lingering + // priority level continues to get a concurrency + // allocation determined by all the share values in the + // regular way. + nominalConcurrencyShares, _, _ := plSpecCommons(plState.pl) + meal.shareSum += float64(nominalConcurrencyShares) meal.haveExemptPL = meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameExempt meal.haveCatchAllPL = meal.haveCatchAllPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll meal.newPLStates[plName] = plState @@ -809,41 +804,35 @@ func (meal *cfgMeal) processOldPLsLocked() { // QueueSets. func (meal *cfgMeal) finishQueueSetReconfigsLocked() { for plName, plState := range meal.newPLStates { - if plState.pl.Spec.Limited == nil { - klog.V(5).Infof("Using exempt priority level %q: quiescing=%v", plName, plState.quiescing) - continue - } - - limited := plState.pl.Spec.Limited + nominalConcurrencyShares, lendablePercent, borrowingLimitPercent := plSpecCommons(plState.pl) // The use of math.Ceil here means that the results might sum // to a little more than serverConcurrencyLimit but the // difference will be negligible. - concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(limited.NominalConcurrencyShares) / meal.shareSum)) + concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(nominalConcurrencyShares) / meal.shareSum)) var lendableCL, borrowingCL int - if limited.LendablePercent != nil { - lendableCL = int(math.Round(float64(concurrencyLimit) * float64(*limited.LendablePercent) / 100)) + if lendablePercent != nil { + lendableCL = int(math.Round(float64(concurrencyLimit) * float64(*lendablePercent) / 100)) } - if limited.BorrowingLimitPercent != nil { - borrowingCL = int(math.Round(float64(concurrencyLimit) * float64(*limited.BorrowingLimitPercent) / 100)) + if borrowingLimitPercent != nil { + borrowingCL = int(math.Round(float64(concurrencyLimit) * float64(*borrowingLimitPercent) / 100)) } else { borrowingCL = meal.cfgCtlr.serverConcurrencyLimit } + metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit, concurrencyLimit-lendableCL, concurrencyLimit+borrowingCL) - plState.seatDemandRatioedGauge.SetDenominator(float64(concurrencyLimit)) cfgChanged := plState.nominalCL != concurrencyLimit || plState.minCL != concurrencyLimit-lendableCL || plState.maxCL != concurrencyLimit+borrowingCL plState.nominalCL = concurrencyLimit plState.minCL = concurrencyLimit - lendableCL plState.maxCL = concurrencyLimit + borrowingCL meal.maxExecutingRequests += concurrencyLimit - var waitLimit int - if qCfg := limited.LimitResponse.Queuing; qCfg != nil { - waitLimit = int(qCfg.Queues * qCfg.QueueLengthLimit) + if limited := plState.pl.Spec.Limited; limited != nil { + if qCfg := limited.LimitResponse.Queuing; qCfg != nil { + meal.maxWaitingRequests += int(qCfg.Queues * qCfg.QueueLengthLimit) + } } - meal.maxWaitingRequests += waitLimit - if plState.queues == nil { initialCL := concurrencyLimit - lendableCL/2 - klog.V(2).Infof("Introducing queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, initialCL, plState.quiescing, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum) + klog.V(2).Infof("Introducing queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, initialCL, plState.quiescing, nominalConcurrencyShares, meal.shareSum) plState.seatDemandStats = seatDemandStats{} plState.currentCL = initialCL } else { @@ -851,7 +840,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { if cfgChanged { logLevel = 2 } - klog.V(logLevel).Infof("Retaining queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, plState.currentCL, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum) + klog.V(logLevel).Infof("Retaining queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, plState.currentCL, plState.quiescing, plState.numPending, nominalConcurrencyShares, meal.shareSum) } } meal.cfgCtlr.nominalCLSum = meal.maxExecutingRequests @@ -859,32 +848,32 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { } // queueSetCompleterForPL returns an appropriate QueueSetCompleter for the -// given priority level configuration. Returns nil if that config -// does not call for limiting. Returns nil and an error if the given +// given priority level configuration. Returns nil and an error if the given // object is malformed in a way that is a problem for this package. func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) { - if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { - return nil, errors.New("broken union structure at the top") + if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited) != (pl.Spec.Limited != nil) { + return nil, errors.New("broken union structure at the top, for Limited") } if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt) { // This package does not attempt to cope with a priority level dynamically switching between exempt and not. return nil, errors.New("non-alignment between name and type") } - if pl.Spec.Limited == nil { - return nil, nil - } - if (pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) { - return nil, errors.New("broken union structure for limit response") - } - qcAPI := pl.Spec.Limited.LimitResponse.Queuing qcQS := fq.QueuingConfig{Name: pl.Name} - if qcAPI != nil { - qcQS = fq.QueuingConfig{Name: pl.Name, - DesiredNumQueues: int(qcAPI.Queues), - QueueLengthLimit: int(qcAPI.QueueLengthLimit), - HandSize: int(qcAPI.HandSize), - RequestWaitLimit: requestWaitLimit, + if pl.Spec.Limited != nil { + if (pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) { + return nil, errors.New("broken union structure for limit response") } + qcAPI := pl.Spec.Limited.LimitResponse.Queuing + if qcAPI != nil { + qcQS = fq.QueuingConfig{Name: pl.Name, + DesiredNumQueues: int(qcAPI.Queues), + QueueLengthLimit: int(qcAPI.QueueLengthLimit), + HandSize: int(qcAPI.HandSize), + RequestWaitLimit: requestWaitLimit, + } + } + } else { + qcQS = fq.QueuingConfig{Name: pl.Name, DesiredNumQueues: -1} } var qsc fq.QueueSetCompleter var err error @@ -894,7 +883,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandGauge) } if err != nil { - err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err) + err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcQS, err) } return qsc, err } @@ -962,13 +951,6 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re } } -type immediateRequest struct{} - -func (immediateRequest) Finish(execute func()) bool { - execute() - return false -} - // startRequest classifies and, if appropriate, enqueues the request. // Returns a nil Request if and only if the request is to be rejected. // The returned bool indicates whether the request is exempt from @@ -1007,32 +989,31 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig } plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name plState := cfgCtlr.priorityLevelStates[plName] - if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { - noteFn(selectedFlowSchema, plState.pl, "") - klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName) - return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{} - } var numQueues int32 - if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue { - numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues - } - var flowDistinguisher string var hashValue uint64 - if numQueues > 1 { - flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod) - hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher) + var flowDistinguisher string + if plState.pl.Spec.Type != flowcontrol.PriorityLevelEnablementExempt { + if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue { + numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues + } + if numQueues > 1 { + flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod) + hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher) + } } noteFn(selectedFlowSchema, plState.pl, flowDistinguisher) workEstimate := workEstimator() - startWaitingTime = cfgCtlr.clock.Now() + if plState.pl.Spec.Type != flowcontrol.PriorityLevelEnablementExempt { + startWaitingTime = cfgCtlr.clock.Now() + } klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) if idle { cfgCtlr.maybeReapReadLocked(plName, plState) } - return selectedFlowSchema, plState.pl, false, req, startWaitingTime + return selectedFlowSchema, plState.pl, plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt, req, startWaitingTime } // maybeReap will remove the last internal traces of the named @@ -1046,10 +1027,6 @@ func (cfgCtlr *configController) maybeReap(plName string) { klog.V(7).Infof("plName=%s, plState==nil", plName) return } - if plState.queues == nil { - klog.V(7).Infof("plName=%s, plState.queues==nil", plName) - return - } useless := plState.quiescing && plState.numPending == 0 && plState.queues.IsIdle() klog.V(7).Infof("plState.quiescing=%v, plState.numPending=%d, useless=%v", plState.quiescing, plState.numPending, useless) if !useless { @@ -1107,3 +1084,12 @@ func relDiff(x, y float64) float64 { } return diff / den } + +// plSpecCommons returns the (NominalConcurrencyShares, LendablePercent, BorrowingLimitPercent) of the given priority level config +func plSpecCommons(pl *flowcontrol.PriorityLevelConfiguration) (int32, *int32, *int32) { + if limiter := pl.Spec.Limited; limiter != nil { + return limiter.NominalConcurrencyShares, limiter.LendablePercent, limiter.BorrowingLimitPercent + } + var zero int32 + return 0, &zero, &zero +} diff --git a/vendor/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go b/vendor/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go index 0b9bc02f9..302391eb9 100644 --- a/vendor/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go +++ b/vendor/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go @@ -75,22 +75,6 @@ func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *ht continue } - if plState.queues == nil { - tabPrint(tabWriter, row( - plState.pl.Name, // 1 - "", // 2 - "", // 3 - "", // 4 - "", // 5 - "", // 6 - "", // 7 - "", // 8 - "", // 9 - "", // 10 - )) - endLine(tabWriter) - continue - } queueSetDigest := plState.queues.Dump(false) activeQueueNum := 0 for _, q := range queueSetDigest.Queues { @@ -134,21 +118,6 @@ func (cfgCtlr *configController) dumpQueues(w http.ResponseWriter, r *http.Reque tabPrint(tabWriter, rowForHeaders(columnHeaders)) endLine(tabWriter) for _, plState := range cfgCtlr.priorityLevelStates { - if plState.queues == nil { - tabPrint(tabWriter, row( - plState.pl.Name, // 1 - "", // 2 - "", // 3 - "", // 4 - "", // 5 - "", // 6 - "", // 7 - "", // 8 - "", // 9 - )) - endLine(tabWriter) - continue - } queueSetDigest := plState.queues.Dump(false) for i, q := range queueSetDigest.Queues { tabPrint(tabWriter, row( @@ -201,9 +170,6 @@ func (cfgCtlr *configController) dumpRequests(w http.ResponseWriter, r *http.Req } endLine(tabWriter) for _, plState := range cfgCtlr.priorityLevelStates { - if plState.queues == nil { - continue - } queueSetDigest := plState.queues.Dump(includeRequestDetails) for iq, q := range queueSetDigest.Queues { for ir, r := range q.Requests { diff --git a/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 5522bb455..013fd41e0 100644 --- a/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -34,7 +34,10 @@ type QueueSetFactory interface { // BeginConstruction does the first phase of creating a QueueSet. // The RatioedGaugePair observes number of requests, // execution covering just the regular phase. + // The denominator for the waiting phase is + // max(1, QueuingConfig.QueueLengthLimit) X max(1, QueuingConfig.DesiredNumQueues). // The RatioedGauge observes number of seats occupied through all phases of execution. + // The denominator for all the ratioed concurrency gauges is supplied later in the DispatchingConfig. // The Gauge observes the seat demand (executing + queued seats). BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, metrics.Gauge) (QueueSetCompleter, error) } @@ -113,8 +116,11 @@ type QueuingConfig struct { Name string // DesiredNumQueues is the number of queues that the API says - // should exist now. This may be zero, in which case + // should exist now. This may be non-positive, in which case // QueueLengthLimit, HandSize, and RequestWaitLimit are ignored. + // A value of zero means to respect the ConcurrencyLimit of the DispatchingConfig. + // A negative value means to always dispatch immediately upon arrival + // (i.e., the requests are "exempt" from limitation). DesiredNumQueues int // QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time @@ -133,4 +139,8 @@ type QueuingConfig struct { type DispatchingConfig struct { // ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time ConcurrencyLimit int + + // ConcurrencyDenominator is used in relative metrics of concurrency. + // It equals ConcurrencyLimit except when that is zero. + ConcurrencyDenominator int } diff --git a/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 11c15ccb7..b01671586 100644 --- a/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -197,7 +197,7 @@ func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePa // calls for one, and returns a non-nil error if the given config is // invalid. func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) { - if qCfg.DesiredNumQueues == 0 { + if qCfg.DesiredNumQueues <= 0 { return nil, nil } dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize) @@ -280,8 +280,8 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, qll *= qCfg.DesiredNumQueues } qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll)) - qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit)) - qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit)) + qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyDenominator)) + qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyDenominator)) qs.dispatchAsMuchAsPossibleLocked() } @@ -796,6 +796,9 @@ func (qs *queueSet) dispatchLocked() bool { // otherwise it returns false. func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { switch { + case qs.qCfg.DesiredNumQueues < 0: + // This is code for exemption from limitation + return true case seats > qs.dCfg.ConcurrencyLimit: // we have picked the queue with the minimum virtual finish time, but // the number of seats this request asks for exceeds the concurrency limit.