Merge pull request #119385 from andrewsykim/current_inqueue_seats_metric
Add apiserver flowcontrol metric `current_inqueue_seats` Kubernetes-commit: 338d68bbc2b5e69c18fed5eea11cc683e72dcbdf
This commit is contained in:
commit
6b6cfe5d12
8
go.mod
8
go.mod
|
|
@ -42,8 +42,8 @@ require (
|
||||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||||
gopkg.in/square/go-jose.v2 v2.6.0
|
gopkg.in/square/go-jose.v2 v2.6.0
|
||||||
k8s.io/api v0.0.0-20230810042731-2f6eec10c476
|
k8s.io/api v0.0.0-20230810042731-2f6eec10c476
|
||||||
k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd
|
k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61
|
||||||
k8s.io/client-go v0.0.0-20230816000755-08b51e978593
|
k8s.io/client-go v0.0.0-20230816000758-856e847bb7cb
|
||||||
k8s.io/component-base v0.0.0-20230807211050-31137ad9f7f2
|
k8s.io/component-base v0.0.0-20230807211050-31137ad9f7f2
|
||||||
k8s.io/klog/v2 v2.100.1
|
k8s.io/klog/v2 v2.100.1
|
||||||
k8s.io/kms v0.0.0-20230807211544-e54c40adc2b2
|
k8s.io/kms v0.0.0-20230807211544-e54c40adc2b2
|
||||||
|
|
@ -127,8 +127,8 @@ require (
|
||||||
|
|
||||||
replace (
|
replace (
|
||||||
k8s.io/api => k8s.io/api v0.0.0-20230810042731-2f6eec10c476
|
k8s.io/api => k8s.io/api v0.0.0-20230810042731-2f6eec10c476
|
||||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd
|
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61
|
||||||
k8s.io/client-go => k8s.io/client-go v0.0.0-20230816000755-08b51e978593
|
k8s.io/client-go => k8s.io/client-go v0.0.0-20230816000758-856e847bb7cb
|
||||||
k8s.io/component-base => k8s.io/component-base v0.0.0-20230807211050-31137ad9f7f2
|
k8s.io/component-base => k8s.io/component-base v0.0.0-20230807211050-31137ad9f7f2
|
||||||
k8s.io/kms => k8s.io/kms v0.0.0-20230807211544-e54c40adc2b2
|
k8s.io/kms => k8s.io/kms v0.0.0-20230807211544-e54c40adc2b2
|
||||||
)
|
)
|
||||||
|
|
|
||||||
8
go.sum
8
go.sum
|
|
@ -672,10 +672,10 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
|
||||||
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||||
k8s.io/api v0.0.0-20230810042731-2f6eec10c476 h1:1LpPoqkYurARQ/TQ0U3DRAclyCkM7hMCMCUVymCR3jM=
|
k8s.io/api v0.0.0-20230810042731-2f6eec10c476 h1:1LpPoqkYurARQ/TQ0U3DRAclyCkM7hMCMCUVymCR3jM=
|
||||||
k8s.io/api v0.0.0-20230810042731-2f6eec10c476/go.mod h1:RFi7MZgMNqcWc0azfutkpPR/OdHWZjnTAwdFHTKjAUQ=
|
k8s.io/api v0.0.0-20230810042731-2f6eec10c476/go.mod h1:RFi7MZgMNqcWc0azfutkpPR/OdHWZjnTAwdFHTKjAUQ=
|
||||||
k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd h1:x//MctFnLnU7WIEUEorfJtI5RkFptZADuNKPFxZBdbg=
|
k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61 h1:PQAKasgkOolbHn5mI2MIMaytDF453zUaMrnG6nrqwy4=
|
||||||
k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw=
|
k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw=
|
||||||
k8s.io/client-go v0.0.0-20230816000755-08b51e978593 h1:n5l4FGgCtxJdblTd2W3FAUyPvNa0/W8Qg3rOMaOLPEg=
|
k8s.io/client-go v0.0.0-20230816000758-856e847bb7cb h1:QLY5cHaZwawHP6394w6miAQAaJ2fwlA/TfRHXmG4U3M=
|
||||||
k8s.io/client-go v0.0.0-20230816000755-08b51e978593/go.mod h1:xt/XQN6z9voSDnQ/uCIJ3a5n5sk2lhhnwzWGeuDhsvE=
|
k8s.io/client-go v0.0.0-20230816000758-856e847bb7cb/go.mod h1:xt/XQN6z9voSDnQ/uCIJ3a5n5sk2lhhnwzWGeuDhsvE=
|
||||||
k8s.io/component-base v0.0.0-20230807211050-31137ad9f7f2 h1:bgkLpsQhIRm8Rd6h9V/n50sN63k6sEzX+Q8nCpZCCX4=
|
k8s.io/component-base v0.0.0-20230807211050-31137ad9f7f2 h1:bgkLpsQhIRm8Rd6h9V/n50sN63k6sEzX+Q8nCpZCCX4=
|
||||||
k8s.io/component-base v0.0.0-20230807211050-31137ad9f7f2/go.mod h1:wjy+fowSTnR9NfN23CZuwDq+yF+viZTN5nbGbXcOYBM=
|
k8s.io/component-base v0.0.0-20230807211050-31137ad9f7f2/go.mod h1:wjy+fowSTnR9NfN23CZuwDq+yF+viZTN5nbGbXcOYBM=
|
||||||
k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg=
|
k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg=
|
||||||
|
|
|
||||||
|
|
@ -448,6 +448,7 @@ func (req *request) wait() (bool, bool) {
|
||||||
qs.totRequestsCancelled++
|
qs.totRequestsCancelled++
|
||||||
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
|
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
|
||||||
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
||||||
|
metrics.AddSeatsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -req.MaxSeats())
|
||||||
req.NoteQueued(false)
|
req.NoteQueued(false)
|
||||||
qs.reqsGaugePair.RequestsWaiting.Add(-1)
|
qs.reqsGaugePair.RequestsWaiting.Add(-1)
|
||||||
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
|
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
|
||||||
|
|
@ -652,6 +653,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f
|
||||||
disqueueSeats += req.MaxSeats()
|
disqueueSeats += req.MaxSeats()
|
||||||
req.NoteQueued(false)
|
req.NoteQueued(false)
|
||||||
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
||||||
|
metrics.AddSeatsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -req.MaxSeats())
|
||||||
}
|
}
|
||||||
// we need to check if the next request has timed out.
|
// we need to check if the next request has timed out.
|
||||||
return true
|
return true
|
||||||
|
|
@ -702,6 +704,7 @@ func (qs *queueSet) enqueueToBoundLocked(request *request) {
|
||||||
qs.totRequestsWaiting++
|
qs.totRequestsWaiting++
|
||||||
qs.totSeatsWaiting += request.MaxSeats()
|
qs.totSeatsWaiting += request.MaxSeats()
|
||||||
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
||||||
|
metrics.AddSeatsInQueues(request.ctx, qs.qCfg.Name, request.fsName, request.MaxSeats())
|
||||||
request.NoteQueued(true)
|
request.NoteQueued(true)
|
||||||
qs.reqsGaugePair.RequestsWaiting.Add(1)
|
qs.reqsGaugePair.RequestsWaiting.Add(1)
|
||||||
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
|
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
|
||||||
|
|
@ -760,6 +763,7 @@ func (qs *queueSet) dispatchLocked() bool {
|
||||||
qs.totRequestsWaiting--
|
qs.totRequestsWaiting--
|
||||||
qs.totSeatsWaiting -= request.MaxSeats()
|
qs.totSeatsWaiting -= request.MaxSeats()
|
||||||
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
|
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
|
||||||
|
metrics.AddSeatsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -request.MaxSeats())
|
||||||
request.NoteQueued(false)
|
request.NoteQueued(false)
|
||||||
qs.reqsGaugePair.RequestsWaiting.Add(-1)
|
qs.reqsGaugePair.RequestsWaiting.Add(-1)
|
||||||
defer qs.boundNextDispatchLocked(queue)
|
defer qs.boundNextDispatchLocked(queue)
|
||||||
|
|
|
||||||
|
|
@ -209,13 +209,13 @@ func (us uniformScenario) exercise(t *testing.T) {
|
||||||
type uniformScenarioState struct {
|
type uniformScenarioState struct {
|
||||||
t *testing.T
|
t *testing.T
|
||||||
uniformScenario
|
uniformScenario
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
doSplit bool
|
doSplit bool
|
||||||
execSeatsIntegrators []fq.Integrator
|
execSeatsIntegrators []fq.Integrator
|
||||||
seatDemandIntegratorCheck fq.Integrator
|
seatDemandIntegratorCheck fq.Integrator
|
||||||
failedCount uint64
|
failedCount uint64
|
||||||
expectedInqueue, expectedExecuting, expectedConcurrencyInUse string
|
expectedInqueueReqs, expectedInqueueSeats, expectedExecuting, expectedConcurrencyInUse string
|
||||||
executions, rejects []int32
|
executions, rejects []int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (uss *uniformScenarioState) exercise() {
|
func (uss *uniformScenarioState) exercise() {
|
||||||
|
|
@ -226,7 +226,8 @@ func (uss *uniformScenarioState) exercise() {
|
||||||
for i, uc := range uss.clients {
|
for i, uc := range uss.clients {
|
||||||
uss.execSeatsIntegrators[i] = fq.NewNamedIntegrator(uss.clk, fmt.Sprintf("%s client %d execSeats", uss.name, i))
|
uss.execSeatsIntegrators[i] = fq.NewNamedIntegrator(uss.clk, fmt.Sprintf("%s client %d execSeats", uss.name, i))
|
||||||
fsName := fmt.Sprintf("client%d", i)
|
fsName := fmt.Sprintf("client%d", i)
|
||||||
uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
|
uss.expectedInqueueReqs = uss.expectedInqueueReqs + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
|
||||||
|
uss.expectedInqueueSeats = uss.expectedInqueueSeats + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_seats{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
|
||||||
for j := 0; j < uc.nThreads; j++ {
|
for j := 0; j < uc.nThreads; j++ {
|
||||||
ust := uniformScenarioThread{
|
ust := uniformScenarioThread{
|
||||||
uss: uss,
|
uss: uss,
|
||||||
|
|
@ -412,13 +413,24 @@ func (uss *uniformScenarioState) finalReview() {
|
||||||
e := `
|
e := `
|
||||||
# HELP apiserver_flowcontrol_current_inqueue_requests [BETA] Number of requests currently pending in queues of the API Priority and Fairness subsystem
|
# HELP apiserver_flowcontrol_current_inqueue_requests [BETA] Number of requests currently pending in queues of the API Priority and Fairness subsystem
|
||||||
# TYPE apiserver_flowcontrol_current_inqueue_requests gauge
|
# TYPE apiserver_flowcontrol_current_inqueue_requests gauge
|
||||||
` + uss.expectedInqueue
|
` + uss.expectedInqueueReqs
|
||||||
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests")
|
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
uss.t.Error(err)
|
uss.t.Error(err)
|
||||||
} else {
|
} else {
|
||||||
uss.t.Log("Success with" + e)
|
uss.t.Log("Success with" + e)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e = `
|
||||||
|
# HELP apiserver_flowcontrol_current_inqueue_seats [ALPHA] Number of seats currently pending in queues of the API Priority and Fairness subsystem
|
||||||
|
# TYPE apiserver_flowcontrol_current_inqueue_seats gauge
|
||||||
|
` + uss.expectedInqueueSeats
|
||||||
|
err = metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_seats")
|
||||||
|
if err != nil {
|
||||||
|
uss.t.Error(err)
|
||||||
|
} else {
|
||||||
|
uss.t.Log("Success with" + e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
expectedRejects := ""
|
expectedRejects := ""
|
||||||
for i := range uss.clients {
|
for i := range uss.clients {
|
||||||
|
|
|
||||||
|
|
@ -210,6 +210,16 @@ var (
|
||||||
},
|
},
|
||||||
[]string{priorityLevel, flowSchema},
|
[]string{priorityLevel, flowSchema},
|
||||||
)
|
)
|
||||||
|
apiserverCurrentInqueueSeats = compbasemetrics.NewGaugeVec(
|
||||||
|
&compbasemetrics.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "current_inqueue_seats",
|
||||||
|
Help: "Number of seats currently pending in queues of the API Priority and Fairness subsystem",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel, flowSchema},
|
||||||
|
)
|
||||||
apiserverRequestQueueLength = compbasemetrics.NewHistogramVec(
|
apiserverRequestQueueLength = compbasemetrics.NewHistogramVec(
|
||||||
&compbasemetrics.HistogramOpts{
|
&compbasemetrics.HistogramOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
|
|
@ -455,6 +465,7 @@ var (
|
||||||
apiserverNextSBounds,
|
apiserverNextSBounds,
|
||||||
apiserverNextDiscountedSBounds,
|
apiserverNextDiscountedSBounds,
|
||||||
apiserverCurrentInqueueRequests,
|
apiserverCurrentInqueueRequests,
|
||||||
|
apiserverCurrentInqueueSeats,
|
||||||
apiserverRequestQueueLength,
|
apiserverRequestQueueLength,
|
||||||
apiserverRequestConcurrencyLimit,
|
apiserverRequestConcurrencyLimit,
|
||||||
apiserverRequestConcurrencyInUse,
|
apiserverRequestConcurrencyInUse,
|
||||||
|
|
@ -518,6 +529,11 @@ func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string,
|
||||||
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddSeatsInQueues adds the given delta to the gauge of the # of seats in the queues of the specified flowSchema and priorityLevel
|
||||||
|
func AddSeatsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) {
|
||||||
|
apiserverCurrentInqueueSeats.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
||||||
|
}
|
||||||
|
|
||||||
// AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel
|
// AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel
|
||||||
func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string, delta int) {
|
func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string, delta int) {
|
||||||
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue