diff --git a/go.mod b/go.mod index 5de3ec0f2..22456b5a5 100644 --- a/go.mod +++ b/go.mod @@ -42,8 +42,8 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/square/go-jose.v2 v2.6.0 k8s.io/api v0.0.0-20230810042731-2f6eec10c476 - k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd - k8s.io/client-go v0.0.0-20230816000755-08b51e978593 + k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61 + k8s.io/client-go v0.0.0-20230816000758-856e847bb7cb k8s.io/component-base v0.0.0-20230807211050-31137ad9f7f2 k8s.io/klog/v2 v2.100.1 k8s.io/kms v0.0.0-20230807211544-e54c40adc2b2 @@ -127,8 +127,8 @@ require ( replace ( k8s.io/api => k8s.io/api v0.0.0-20230810042731-2f6eec10c476 - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd - k8s.io/client-go => k8s.io/client-go v0.0.0-20230816000755-08b51e978593 + k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61 + k8s.io/client-go => k8s.io/client-go v0.0.0-20230816000758-856e847bb7cb k8s.io/component-base => k8s.io/component-base v0.0.0-20230807211050-31137ad9f7f2 k8s.io/kms => k8s.io/kms v0.0.0-20230807211544-e54c40adc2b2 ) diff --git a/go.sum b/go.sum index d4ac72e63..f559af762 100644 --- a/go.sum +++ b/go.sum @@ -672,10 +672,10 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/api v0.0.0-20230810042731-2f6eec10c476 h1:1LpPoqkYurARQ/TQ0U3DRAclyCkM7hMCMCUVymCR3jM= k8s.io/api v0.0.0-20230810042731-2f6eec10c476/go.mod h1:RFi7MZgMNqcWc0azfutkpPR/OdHWZjnTAwdFHTKjAUQ= -k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd h1:x//MctFnLnU7WIEUEorfJtI5RkFptZADuNKPFxZBdbg= -k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw= -k8s.io/client-go v0.0.0-20230816000755-08b51e978593 h1:n5l4FGgCtxJdblTd2W3FAUyPvNa0/W8Qg3rOMaOLPEg= -k8s.io/client-go v0.0.0-20230816000755-08b51e978593/go.mod h1:xt/XQN6z9voSDnQ/uCIJ3a5n5sk2lhhnwzWGeuDhsvE= +k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61 h1:PQAKasgkOolbHn5mI2MIMaytDF453zUaMrnG6nrqwy4= +k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw= +k8s.io/client-go v0.0.0-20230816000758-856e847bb7cb h1:QLY5cHaZwawHP6394w6miAQAaJ2fwlA/TfRHXmG4U3M= +k8s.io/client-go v0.0.0-20230816000758-856e847bb7cb/go.mod h1:xt/XQN6z9voSDnQ/uCIJ3a5n5sk2lhhnwzWGeuDhsvE= k8s.io/component-base v0.0.0-20230807211050-31137ad9f7f2 h1:bgkLpsQhIRm8Rd6h9V/n50sN63k6sEzX+Q8nCpZCCX4= k8s.io/component-base v0.0.0-20230807211050-31137ad9f7f2/go.mod h1:wjy+fowSTnR9NfN23CZuwDq+yF+viZTN5nbGbXcOYBM= k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index aa54a9ccf..d955be765 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -448,6 +448,7 @@ func (req *request) wait() (bool, bool) { qs.totRequestsCancelled++ metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled") metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) + metrics.AddSeatsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -req.MaxSeats()) req.NoteQueued(false) qs.reqsGaugePair.RequestsWaiting.Add(-1) qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting)) @@ -652,6 +653,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f disqueueSeats += req.MaxSeats() req.NoteQueued(false) metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) + metrics.AddSeatsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -req.MaxSeats()) } // we need to check if the next request has timed out. return true @@ -702,6 +704,7 @@ func (qs *queueSet) enqueueToBoundLocked(request *request) { qs.totRequestsWaiting++ qs.totSeatsWaiting += request.MaxSeats() metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) + metrics.AddSeatsInQueues(request.ctx, qs.qCfg.Name, request.fsName, request.MaxSeats()) request.NoteQueued(true) qs.reqsGaugePair.RequestsWaiting.Add(1) qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting)) @@ -760,6 +763,7 @@ func (qs *queueSet) dispatchLocked() bool { qs.totRequestsWaiting-- qs.totSeatsWaiting -= request.MaxSeats() metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) + metrics.AddSeatsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -request.MaxSeats()) request.NoteQueued(false) qs.reqsGaugePair.RequestsWaiting.Add(-1) defer qs.boundNextDispatchLocked(queue) diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 89ed063d5..1d3d5e5b2 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -209,13 +209,13 @@ func (us uniformScenario) exercise(t *testing.T) { type uniformScenarioState struct { t *testing.T uniformScenario - startTime time.Time - doSplit bool - execSeatsIntegrators []fq.Integrator - seatDemandIntegratorCheck fq.Integrator - failedCount uint64 - expectedInqueue, expectedExecuting, expectedConcurrencyInUse string - executions, rejects []int32 + startTime time.Time + doSplit bool + execSeatsIntegrators []fq.Integrator + seatDemandIntegratorCheck fq.Integrator + failedCount uint64 + expectedInqueueReqs, expectedInqueueSeats, expectedExecuting, expectedConcurrencyInUse string + executions, rejects []int32 } func (uss *uniformScenarioState) exercise() { @@ -226,7 +226,8 @@ func (uss *uniformScenarioState) exercise() { for i, uc := range uss.clients { uss.execSeatsIntegrators[i] = fq.NewNamedIntegrator(uss.clk, fmt.Sprintf("%s client %d execSeats", uss.name, i)) fsName := fmt.Sprintf("client%d", i) - uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") + uss.expectedInqueueReqs = uss.expectedInqueueReqs + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") + uss.expectedInqueueSeats = uss.expectedInqueueSeats + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_seats{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") for j := 0; j < uc.nThreads; j++ { ust := uniformScenarioThread{ uss: uss, @@ -412,13 +413,24 @@ func (uss *uniformScenarioState) finalReview() { e := ` # HELP apiserver_flowcontrol_current_inqueue_requests [BETA] Number of requests currently pending in queues of the API Priority and Fairness subsystem # TYPE apiserver_flowcontrol_current_inqueue_requests gauge -` + uss.expectedInqueue +` + uss.expectedInqueueReqs err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests") if err != nil { uss.t.Error(err) } else { uss.t.Log("Success with" + e) } + + e = ` + # HELP apiserver_flowcontrol_current_inqueue_seats [ALPHA] Number of seats currently pending in queues of the API Priority and Fairness subsystem + # TYPE apiserver_flowcontrol_current_inqueue_seats gauge +` + uss.expectedInqueueSeats + err = metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_seats") + if err != nil { + uss.t.Error(err) + } else { + uss.t.Log("Success with" + e) + } } expectedRejects := "" for i := range uss.clients { diff --git a/pkg/util/flowcontrol/metrics/metrics.go b/pkg/util/flowcontrol/metrics/metrics.go index 54af4415c..9fe7b15a0 100644 --- a/pkg/util/flowcontrol/metrics/metrics.go +++ b/pkg/util/flowcontrol/metrics/metrics.go @@ -210,6 +210,16 @@ var ( }, []string{priorityLevel, flowSchema}, ) + apiserverCurrentInqueueSeats = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "current_inqueue_seats", + Help: "Number of seats currently pending in queues of the API Priority and Fairness subsystem", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel, flowSchema}, + ) apiserverRequestQueueLength = compbasemetrics.NewHistogramVec( &compbasemetrics.HistogramOpts{ Namespace: namespace, @@ -455,6 +465,7 @@ var ( apiserverNextSBounds, apiserverNextDiscountedSBounds, apiserverCurrentInqueueRequests, + apiserverCurrentInqueueSeats, apiserverRequestQueueLength, apiserverRequestConcurrencyLimit, apiserverRequestConcurrencyInUse, @@ -518,6 +529,11 @@ func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string, apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) } +// AddSeatsInQueues adds the given delta to the gauge of the # of seats in the queues of the specified flowSchema and priorityLevel +func AddSeatsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) { + apiserverCurrentInqueueSeats.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) +} + // AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string, delta int) { apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))