More assertive borrowing by exempt

Happy middle ground with what the KEP says?

Signed-off-by: Mike Spreitzer <mspreitz@us.ibm.com>

Kubernetes-commit: 56fc11f3bef9f6af16aa30731050168e732754a2
This commit is contained in:
Mike Spreitzer 2024-05-08 02:30:27 -04:00 committed by Kubernetes Publisher
parent 36d8f544a9
commit 010634c01b
1 changed files with 62 additions and 28 deletions

View File

@ -204,7 +204,7 @@ type priorityLevelState struct {
// reached through this pointer is mutable. // reached through this pointer is mutable.
pl *flowcontrol.PriorityLevelConfiguration pl *flowcontrol.PriorityLevelConfiguration
// qsCompleter holds the QueueSetCompleter derived from `config` // qsCompleter holds the QueueSetCompleter derived from `pl`
// and `queues`. // and `queues`.
qsCompleter fq.QueueSetCompleter qsCompleter fq.QueueSetCompleter
@ -255,12 +255,12 @@ type priorityLevelState struct {
type seatDemandStats struct { type seatDemandStats struct {
avg float64 avg float64
stdDev float64 stdDev float64
highWatermark float64 highWatermark int
smoothed float64 smoothed float64
} }
func (stats *seatDemandStats) update(obs fq.IntegratorResults) { func (stats *seatDemandStats) update(obs fq.IntegratorResults) {
stats.highWatermark = obs.Max stats.highWatermark = int(math.Round(obs.Max))
if obs.Duration <= 0 { if obs.Duration <= 0 {
return return
} }
@ -398,38 +398,63 @@ func (cfgCtlr *configController) updateBorrowing() {
func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plStates map[string]*priorityLevelState) { func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plStates map[string]*priorityLevelState) {
items := make([]allocProblemItem, 0, len(plStates)) items := make([]allocProblemItem, 0, len(plStates))
plNames := make([]string, 0, len(plStates)) nonExemptPLNames := make([]string, 0, len(plStates))
idxOfNonExempt := map[string]int{}
cclOfExempt := map[string]int{}
var minCLSum, minCurrentCLSum int
remainingServerCL := cfgCtlr.nominalCLSum
for plName, plState := range plStates { for plName, plState := range plStates {
obs := plState.seatDemandIntegrator.Reset() obs := plState.seatDemandIntegrator.Reset()
plState.seatDemandStats.update(obs) plState.seatDemandStats.update(obs)
// Lower bound on this priority level's adjusted concurreny limit is the lesser of: var minCurrentCL int
// - its seat demamd high watermark over the last adjustment period, and if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
// - its configured concurrency limit. minCurrentCL = max(plState.minCL, plState.seatDemandStats.highWatermark)
// BUT: we do not want this to be lower than the lower bound from configuration. cclOfExempt[plName] = minCurrentCL
// See KEP-1040 for a more detailed explanation. remainingServerCL -= minCurrentCL
minCurrentCL := math.Max(float64(plState.minCL), math.Min(float64(plState.nominalCL), plState.seatDemandStats.highWatermark)) } else {
plNames = append(plNames, plName) // Lower bound on this priority level's adjusted concurreny limit is the lesser of:
items = append(items, allocProblemItem{ // - its seat demamd high watermark over the last adjustment period, and
lowerBound: minCurrentCL, // - its configured concurrency limit.
upperBound: float64(plState.maxCL), // BUT: we do not want this to be lower than the lower bound from configuration.
target: math.Max(minCurrentCL, plState.seatDemandStats.smoothed), // See KEP-1040 for a more detailed explanation.
}) minCurrentCL = max(plState.minCL, min(plState.nominalCL, plState.seatDemandStats.highWatermark))
idxOfNonExempt[plName] = len(items)
nonExemptPLNames = append(nonExemptPLNames, plName)
items = append(items, allocProblemItem{
lowerBound: float64(minCurrentCL),
upperBound: float64(plState.maxCL),
target: math.Max(float64(minCurrentCL), plState.seatDemandStats.smoothed),
})
}
minCLSum += plState.minCL
minCurrentCLSum += minCurrentCL
} }
if len(items) == 0 && cfgCtlr.nominalCLSum > 0 { if len(items) == 0 && cfgCtlr.nominalCLSum > 0 {
klog.ErrorS(nil, "Impossible: no priority levels", "plStates", cfgCtlr.priorityLevelStates) klog.ErrorS(nil, "Impossible: no priority levels", "plStates", cfgCtlr.priorityLevelStates)
return return
} }
allocs, fairFrac, err := computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items) var allocs []float64
if err != nil { var shareFrac, fairFrac float64
klog.ErrorS(err, "Unable to derive new concurrency limits", "plNames", plNames, "items", items) var err error
allocs = make([]float64, len(items)) if remainingServerCL <= minCLSum {
for idx, plName := range plNames { metrics.SetFairFrac(0)
plState := plStates[plName] } else if remainingServerCL <= minCurrentCLSum {
allocs[idx] = float64(plState.currentCL) shareFrac = float64(remainingServerCL-minCLSum) / float64(minCurrentCLSum-minCLSum)
metrics.SetFairFrac(0)
} else {
allocs, fairFrac, err = computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items)
if err != nil {
klog.ErrorS(err, "Unable to derive new concurrency limits", "plNames", nonExemptPLNames, "items", items)
allocs = make([]float64, len(items))
for idx, plName := range nonExemptPLNames {
plState := plStates[plName]
allocs[idx] = float64(plState.currentCL)
}
} }
metrics.SetFairFrac(float64(fairFrac))
} }
for idx, plName := range plNames { for plName, plState := range plStates {
plState := plStates[plName] idx, isNonExempt := idxOfNonExempt[plName]
if setCompleters { if setCompleters {
qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues, qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues,
plState.pl, plState.reqsGaugePair, plState.execSeatsObs, plState.pl, plState.reqsGaugePair, plState.execSeatsObs,
@ -440,10 +465,20 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
} }
plState.qsCompleter = qsCompleter plState.qsCompleter = qsCompleter
} }
currentCL := int(math.Round(float64(allocs[idx]))) var currentCL int
if !isNonExempt {
currentCL = cclOfExempt[plName]
} else if remainingServerCL <= minCLSum {
currentCL = plState.minCL
} else if remainingServerCL <= minCurrentCLSum {
minCurrentCL := max(plState.minCL, min(plState.nominalCL, plState.seatDemandStats.highWatermark))
currentCL = plState.minCL + int(math.Round(float64(minCurrentCL-plState.minCL)*shareFrac))
} else {
currentCL = int(math.Round(float64(allocs[idx])))
}
relChange := relDiff(float64(currentCL), float64(plState.currentCL)) relChange := relDiff(float64(currentCL), float64(plState.currentCL))
plState.currentCL = currentCL plState.currentCL = currentCL
metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, plState.seatDemandStats.highWatermark, plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed, float64(items[idx].target), currentCL) metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, float64(plState.seatDemandStats.highWatermark), plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed, float64(items[idx].target), currentCL)
logLevel := klog.Level(4) logLevel := klog.Level(4)
if relChange >= 0.05 { if relChange >= 0.05 {
logLevel = 2 logLevel = 2
@ -458,7 +493,6 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "concurrencyDenominator", concurrencyDenominator, "backstop", err != nil) klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "concurrencyDenominator", concurrencyDenominator, "backstop", err != nil)
plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL, ConcurrencyDenominator: concurrencyDenominator}) plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL, ConcurrencyDenominator: concurrencyDenominator})
} }
metrics.SetFairFrac(float64(fairFrac))
} }
// runWorker is the logic of the one and only worker goroutine. We // runWorker is the logic of the one and only worker goroutine. We