Merge pull request #110164 from MikeSpreitzer/supply-denominators

Supply denominators

Kubernetes-commit: d48c0677712f76f1b1832d7f95625ff5994eeae4
This commit is contained in:
Kubernetes Publisher 2022-07-25 13:32:34 -07:00
commit 077c6def5f
7 changed files with 105 additions and 34 deletions

View File

@ -41,10 +41,6 @@ const (
// the metrics tracks maximal value over period making this
// longer will increase the metric value.
inflightUsageMetricUpdatePeriod = time.Second
// How often to run maintenance on observations to ensure
// that they do not fall too far behind.
observationMaintenancePeriod = 10 * time.Second
)
var (
@ -90,9 +86,7 @@ func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
// watermark tracks requests being executed (not waiting in a queue)
var watermark = &requestWatermark{
phase: metrics.ExecutingPhase,
readOnlyObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueExecuting, metrics.ReadOnlyKind}),
mutatingObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueExecuting, metrics.MutatingKind}),
phase: metrics.ExecutingPhase,
}
// startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark.
@ -108,14 +102,25 @@ func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct
metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
}, inflightUsageMetricUpdatePeriod, stopCh)
}
// Periodically observe the watermarks. This is done to ensure that they do not fall too far behind. When they do
// fall too far behind, then there is a long delay in responding to the next request received while the observer
// catches back up.
go wait.Until(func() {
watermark.readOnlyObserver.Add(0)
watermark.mutatingObserver.Add(0)
}, observationMaintenancePeriod, stopCh)
var initMaxInFlightOnce sync.Once
func initMaxInFlight(nonMutatingLimit, mutatingLimit int) {
initMaxInFlightOnce.Do(func() {
// Fetching these gauges is delayed until after their underlying metric has been registered
// so that this latches onto the efficient implementation.
watermark.readOnlyObserver = fcmetrics.GetExecutingReadonlyConcurrency()
watermark.mutatingObserver = fcmetrics.GetExecutingMutatingConcurrency()
if nonMutatingLimit != 0 {
watermark.readOnlyObserver.SetDenominator(float64(nonMutatingLimit))
klog.V(2).InfoS("Set denominator for readonly requests", "limit", nonMutatingLimit)
}
if mutatingLimit != 0 {
watermark.mutatingObserver.SetDenominator(float64(mutatingLimit))
klog.V(2).InfoS("Set denominator for mutating requests", "limit", mutatingLimit)
}
})
}
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
@ -132,12 +137,17 @@ func WithMaxInFlightLimit(
var mutatingChan chan bool
if nonMutatingLimit != 0 {
nonMutatingChan = make(chan bool, nonMutatingLimit)
watermark.readOnlyObserver.SetDenominator(float64(nonMutatingLimit))
klog.V(2).InfoS("Initialized nonMutatingChan", "len", nonMutatingLimit)
} else {
klog.V(2).InfoS("Running with nil nonMutatingChan")
}
if mutatingLimit != 0 {
mutatingChan = make(chan bool, mutatingLimit)
watermark.mutatingObserver.SetDenominator(float64(mutatingLimit))
klog.V(2).InfoS("Initialized mutatingChan", "len", mutatingLimit)
} else {
klog.V(2).InfoS("Running with nil mutatingChan")
}
initMaxInFlight(nonMutatingLimit, mutatingLimit)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

View File

@ -32,7 +32,7 @@ import (
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
)
func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server {
func createMaxInflightServer(t *testing.T, callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server {
fcmetrics.Register()
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
@ -49,7 +49,9 @@ func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *b
if waitForCalls {
callsWg.Done()
}
t.Logf("About to blockWg.Wait(), requestURI=%v, remoteAddr=%v", r.RequestURI, r.RemoteAddr)
blockWg.Wait()
t.Logf("Returned from blockWg.Wait(), requestURI=%v, remoteAddr=%v", r.RequestURI, r.RemoteAddr)
}),
nonMutating,
mutating,
@ -103,7 +105,7 @@ func TestMaxInFlightNonMutating(t *testing.T) {
waitForCalls := true
waitForCallsMutex := sync.Mutex{}
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1)
server := createMaxInflightServer(t, calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
@ -187,7 +189,7 @@ func TestMaxInFlightMutating(t *testing.T) {
waitForCalls := true
waitForCallsMutex := sync.Mutex{}
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
server := createMaxInflightServer(t, calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
@ -283,7 +285,7 @@ func TestMaxInFlightSkipsMasters(t *testing.T) {
waitForCalls := true
waitForCallsMutex := sync.Mutex{}
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
server := createMaxInflightServer(t, calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"runtime"
"sync"
"sync/atomic"
"time"
@ -46,9 +47,7 @@ type PriorityAndFairnessClassification struct {
// waitingMark tracks requests waiting rather than being executed
var waitingMark = &requestWatermark{
phase: epmetrics.WaitingPhase,
readOnlyObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueWaiting, epmetrics.ReadOnlyKind}),
mutatingObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueWaiting, epmetrics.MutatingKind}),
phase: epmetrics.WaitingPhase,
}
var atomicMutatingExecuting, atomicReadOnlyExecuting int32
@ -66,6 +65,8 @@ func truncateLogField(s string) string {
return s
}
var initAPFOnce sync.Once
// WithPriorityAndFairness limits the number of in-flight
// requests in a fine-grained way.
func WithPriorityAndFairness(
@ -78,6 +79,13 @@ func WithPriorityAndFairness(
klog.Warningf("priority and fairness support not found, skipping")
return handler
}
initAPFOnce.Do(func() {
initMaxInFlight(0, 0)
// Fetching these gauges is delayed until after their underlying metric has been registered
// so that this latches onto the efficient implementation.
waitingMark.readOnlyObserver = fcmetrics.GetWaitingReadonlyConcurrency()
waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency()
})
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)

View File

@ -143,8 +143,8 @@ type configController struct {
// This may only be accessed from the one and only worker goroutine.
mostRecentUpdates []updateAttempt
// This must be locked while accessing flowSchemas or
// priorityLevelStates. A lock for writing is needed
// This must be locked while accessing the later fields.
// A lock for writing is needed
// for writing to any of the following:
// - the flowSchemas field
// - the slice held in the flowSchemas field
@ -387,6 +387,8 @@ type cfgMeal struct {
// provoking a call into this controller while the lock held
// waiting on that request to complete.
fsStatusUpdates []fsStatusUpdate
maxWaitingRequests, maxExecutingRequests int
}
// A buffered set of status updates for FlowSchemas
@ -511,7 +513,13 @@ func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontro
// The new config has been constructed
cfgCtlr.priorityLevelStates = meal.newPLStates
klog.V(5).Infof("Switched to new API Priority and Fairness configuration")
klog.V(5).InfoS("Switched to new API Priority and Fairness configuration", "maxWaitingRequests", meal.maxWaitingRequests, "maxExecutinRequests", meal.maxExecutingRequests)
metrics.GetWaitingReadonlyConcurrency().SetDenominator(float64(meal.maxWaitingRequests))
metrics.GetWaitingMutatingConcurrency().SetDenominator(float64(meal.maxWaitingRequests))
metrics.GetExecutingReadonlyConcurrency().SetDenominator(float64(meal.maxExecutingRequests))
metrics.GetExecutingMutatingConcurrency().SetDenominator(float64(meal.maxExecutingRequests))
return meal.fsStatusUpdates
}
@ -663,6 +671,12 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
// difference will be negligible.
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum))
metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
meal.maxExecutingRequests += concurrencyLimit
var waitLimit int
if qCfg := plState.pl.Spec.Limited.LimitResponse.Queuing; qCfg != nil {
waitLimit = int(qCfg.Queues * qCfg.QueueLengthLimit)
}
meal.maxWaitingRequests += waitLimit
if plState.queues == nil {
klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum)

View File

@ -243,6 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
if qll < 1 {
qll = 1
}
if qCfg.DesiredNumQueues > 0 {
qll *= qCfg.DesiredNumQueues
}
qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll))
qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit))

View File

@ -23,6 +23,7 @@ import (
"sync"
"time"
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
@ -128,21 +129,22 @@ var (
Name: "priority_level_request_utilization",
Help: "Observations, at the end of every nanosecond, of number of requests (as a fraction of the relevant limit) waiting or in any stage of execution (but only initial stage for WATCHes)",
// For executing: the denominator will be seats, so this metric will skew low.
// FOr waiting: the denominiator is individual queue length limit, so this metric can go over 1. Issue #110160
Buckets: []float64{0, 0.001, 0.0025, 0.005, 0.1, 0.25, 0.5, 0.75, 1, 10, 100},
// For waiting: total queue capacity is generally quite generous, so this metric will skew low.
Buckets: []float64{0, 0.001, 0.003, 0.01, 0.03, 0.1, 0.25, 0.5, 0.75, 1},
StabilityLevel: compbasemetrics.ALPHA,
},
LabelNamePhase, priorityLevel,
)
// ReadWriteConcurrencyPairVec creates gauges of number of requests broken down by phase and mutating vs readonly
ReadWriteConcurrencyGaugeVec = NewTimingRatioHistogramVec(
// readWriteConcurrencyGaugeVec creates ratioed gauges of requests/limit broken down by phase and mutating vs readonly
readWriteConcurrencyGaugeVec = NewTimingRatioHistogramVec(
&compbasemetrics.TimingHistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "read_vs_write_current_requests",
Help: "Observations, at the end of every nanosecond, of the number of requests (as a fraction of the relevant limit, if max-in-flight filter is being used) waiting or in regular stage of execution",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 1, 3, 10, 30, 100, 300, 1000, 3000},
// TODO: something about the utilization vs count irregularity. Issue #109846
Help: "Observations, at the end of every nanosecond, of the number of requests (as a fraction of the relevant limit) waiting or in regular stage of execution",
// This metric will skew low for the same reason as the priority level metrics
// and also because APF has a combined limit for mutating and readonly.
Buckets: []float64{0, 0.001, 0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 1},
StabilityLevel: compbasemetrics.ALPHA,
},
LabelNamePhase, requestKind,
@ -337,9 +339,39 @@ var (
}.
Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...).
Append(PriorityLevelConcurrencyGaugeVec.metrics()...).
Append(ReadWriteConcurrencyGaugeVec.metrics()...)
Append(readWriteConcurrencyGaugeVec.metrics()...)
)
type indexOnce struct {
labelValues []string
once sync.Once
gauge RatioedGauge
}
func (io *indexOnce) getGauge() RatioedGauge {
io.once.Do(func() {
io.gauge = readWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, io.labelValues)
})
return io.gauge
}
var waitingReadonly = indexOnce{labelValues: []string{LabelValueWaiting, epmetrics.ReadOnlyKind}}
var executingReadonly = indexOnce{labelValues: []string{LabelValueExecuting, epmetrics.ReadOnlyKind}}
var waitingMutating = indexOnce{labelValues: []string{LabelValueWaiting, epmetrics.MutatingKind}}
var executingMutating = indexOnce{labelValues: []string{LabelValueExecuting, epmetrics.MutatingKind}}
// GetWaitingReadonlyConcurrency returns the gauge of number of readonly requests waiting / limit on those.
var GetWaitingReadonlyConcurrency = waitingReadonly.getGauge
// GetExecutingReadonlyConcurrency returns the gauge of number of executing readonly requests / limit on those.
var GetExecutingReadonlyConcurrency = executingReadonly.getGauge
// GetWaitingMutatingConcurrency returns the gauge of number of mutating requests waiting / limit on those.
var GetWaitingMutatingConcurrency = waitingMutating.getGauge
// GetExecutingMutatingConcurrency returns the gauge of number of executing mutating requests / limit on those.
var GetExecutingMutatingConcurrency = executingMutating.getGauge
// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel
func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) {
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))

View File

@ -192,12 +192,14 @@ func (v *TimingRatioHistogramVec) NewForLabelValuesChecked(initialNumerator, ini
func (v *TimingRatioHistogramVec) NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge {
tro, err := v.NewForLabelValuesChecked(initialNumerator, initialDenominator, labelValues)
if err == nil {
klog.V(3).InfoS("TimingRatioHistogramVec.NewForLabelValuesSafe hit the efficient case", "fqName", v.FQName(), "labelValues", labelValues)
return tro
}
if !compbasemetrics.ErrIsNotRegistered(err) {
klog.ErrorS(err, "Failed to extract TimingRatioHistogramVec member, using noop instead", "vectorname", v.FQName(), "labelValues", labelValues)
return tro
}
klog.V(3).InfoS("TimingRatioHistogramVec.NewForLabelValuesSafe hit the inefficient case", "fqName", v.FQName(), "labelValues", labelValues)
// At this point we know v.NewForLabelValuesChecked(..) returns a permanent noop,
// which we precisely want to avoid using. Instead, make our own gauge that
// fetches the element on every Set.