apiserver: exclude APF queue wait time from SLO latency metrics (#116420)

* apiserver: add latency tracker for priority & fairness queue wait time

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

* apiserver: exclude priority & fairness wait times to SLO/SLI latency metrics

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

* apiserver: update TestLatencyTrackersFrom to check latency from PriorityAndFairnessTracker

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

* flowcontrol: add helper function observeQueueWaitTime to consolidate metric and latency tracker calls

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

* flowcontrol: replace time.Now() / time.Since() with clock.Now() / clock.Since() for better testability

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

* flowcontrol: add unit test TestQueueWaitTimeLatencyTracker to validate queue wait times recorded by latency tracker

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

---------

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

Kubernetes-commit: ee18f602523e11a80823a659bed8f70f98a12914
This commit is contained in:
Andrew Sy Kim 2023-03-14 05:15:20 -04:00 committed by Kubernetes Publisher
parent d3d464578a
commit 53a2449e3c
8 changed files with 210 additions and 19 deletions

8
go.mod
View File

@ -42,9 +42,9 @@ require (
google.golang.org/protobuf v1.28.1 google.golang.org/protobuf v1.28.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/square/go-jose.v2 v2.6.0 gopkg.in/square/go-jose.v2 v2.6.0
k8s.io/api v0.0.0-20230315055824-5ab285d38c19 k8s.io/api v0.0.0-20230314091508-112a65bae227
k8s.io/apimachinery v0.0.0-20230314010357-128166500c57 k8s.io/apimachinery v0.0.0-20230314010357-128166500c57
k8s.io/client-go v0.0.0-20230315061805-362ffe4fcd37 k8s.io/client-go v0.0.0-20230315061813-3cafc13f5d42
k8s.io/component-base v0.0.0-20230313212246-ce16dede9c0e k8s.io/component-base v0.0.0-20230313212246-ce16dede9c0e
k8s.io/klog/v2 v2.90.1 k8s.io/klog/v2 v2.90.1
k8s.io/kms v0.0.0-20230313212457-12714b59d299 k8s.io/kms v0.0.0-20230313212457-12714b59d299
@ -124,9 +124,9 @@ require (
) )
replace ( replace (
k8s.io/api => k8s.io/api v0.0.0-20230315055824-5ab285d38c19 k8s.io/api => k8s.io/api v0.0.0-20230314091508-112a65bae227
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230314010357-128166500c57 k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230314010357-128166500c57
k8s.io/client-go => k8s.io/client-go v0.0.0-20230315061805-362ffe4fcd37 k8s.io/client-go => k8s.io/client-go v0.0.0-20230315061813-3cafc13f5d42
k8s.io/component-base => k8s.io/component-base v0.0.0-20230313212246-ce16dede9c0e k8s.io/component-base => k8s.io/component-base v0.0.0-20230313212246-ce16dede9c0e
k8s.io/kms => k8s.io/kms v0.0.0-20230313212457-12714b59d299 k8s.io/kms => k8s.io/kms v0.0.0-20230313212457-12714b59d299
) )

8
go.sum
View File

@ -874,12 +874,12 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20230315055824-5ab285d38c19 h1:+y54q1Icmf7JV4NpvSmWdb7qUqfJut+VMYn6ieej5a8= k8s.io/api v0.0.0-20230314091508-112a65bae227 h1:Ak4YrHI4101ZMfx2hkXnp//d5r0nKXA8RkNaHCBJ7MA=
k8s.io/api v0.0.0-20230315055824-5ab285d38c19/go.mod h1:YsFNxBfPYQZAIBg0XvL94rxDDVZvQQQUlo4KbwEEqNU= k8s.io/api v0.0.0-20230314091508-112a65bae227/go.mod h1:YsFNxBfPYQZAIBg0XvL94rxDDVZvQQQUlo4KbwEEqNU=
k8s.io/apimachinery v0.0.0-20230314010357-128166500c57 h1:Vr1geeI+at1NNCWyTN70NtPSNcveZ+fAcbZivzwHknM= k8s.io/apimachinery v0.0.0-20230314010357-128166500c57 h1:Vr1geeI+at1NNCWyTN70NtPSNcveZ+fAcbZivzwHknM=
k8s.io/apimachinery v0.0.0-20230314010357-128166500c57/go.mod h1:1AlvkfXatlv5Kq9dCZg3Ksdu/DyrZ31Q0CncRqQ8Q9I= k8s.io/apimachinery v0.0.0-20230314010357-128166500c57/go.mod h1:1AlvkfXatlv5Kq9dCZg3Ksdu/DyrZ31Q0CncRqQ8Q9I=
k8s.io/client-go v0.0.0-20230315061805-362ffe4fcd37 h1:P92qDx0WAKPH7Bnaxlj2m/gQR3OehZ6oF0vfUTxVc90= k8s.io/client-go v0.0.0-20230315061813-3cafc13f5d42 h1:b7qY4Bq8gHunC2mYT/RAtMWDgMmkMb16oQuktJkHpHI=
k8s.io/client-go v0.0.0-20230315061805-362ffe4fcd37/go.mod h1:S0HXdZfM0Ydi8yxoINLXkc3OT+Zx3YntXTvMq5xdNdo= k8s.io/client-go v0.0.0-20230315061813-3cafc13f5d42/go.mod h1:oWqDJxiXYcSV7S8woQ4H5epopeK85SJyOu5lJsMndcU=
k8s.io/component-base v0.0.0-20230313212246-ce16dede9c0e h1:/mftAl/78q8dPZU8YkshNzt1XpbEJTGsxYZP/WGI4og= k8s.io/component-base v0.0.0-20230313212246-ce16dede9c0e h1:/mftAl/78q8dPZU8YkshNzt1XpbEJTGsxYZP/WGI4og=
k8s.io/component-base v0.0.0-20230313212246-ce16dede9c0e/go.mod h1:MQKfc/tXtS50042g1VxMb2W2E8PCt97xO8RsLTw3AeI= k8s.io/component-base v0.0.0-20230313212246-ce16dede9c0e/go.mod h1:MQKfc/tXtS50042g1VxMb2W2E8PCt97xO8RsLTw3AeI=
k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=

View File

@ -106,7 +106,7 @@ var (
&compbasemetrics.HistogramOpts{ &compbasemetrics.HistogramOpts{
Subsystem: APIServerComponent, Subsystem: APIServerComponent,
Name: "request_slo_duration_seconds", Name: "request_slo_duration_seconds",
Help: "Response latency distribution (not counting webhook duration) in seconds for each verb, group, version, resource, subresource, scope and component.", Help: "Response latency distribution (not counting webhook duration and priority & fairness queue wait times) in seconds for each verb, group, version, resource, subresource, scope and component.",
// This metric is supplementary to the requestLatencies metric. // This metric is supplementary to the requestLatencies metric.
// It measures request duration excluding webhooks as they are mostly // It measures request duration excluding webhooks as they are mostly
// dependant on user configuration. // dependant on user configuration.
@ -121,7 +121,7 @@ var (
&compbasemetrics.HistogramOpts{ &compbasemetrics.HistogramOpts{
Subsystem: APIServerComponent, Subsystem: APIServerComponent,
Name: "request_sli_duration_seconds", Name: "request_sli_duration_seconds",
Help: "Response latency distribution (not counting webhook duration) in seconds for each verb, group, version, resource, subresource, scope and component.", Help: "Response latency distribution (not counting webhook duration and priority & fairness queue wait times) in seconds for each verb, group, version, resource, subresource, scope and component.",
// This metric is supplementary to the requestLatencies metric. // This metric is supplementary to the requestLatencies metric.
// It measures request duration excluding webhooks as they are mostly // It measures request duration excluding webhooks as they are mostly
// dependant on user configuration. // dependant on user configuration.
@ -544,7 +544,7 @@ func MonitorRequest(req *http.Request, verb, group, version, resource, subresour
fieldValidationRequestLatencies.WithContext(req.Context()).WithLabelValues(fieldValidation) fieldValidationRequestLatencies.WithContext(req.Context()).WithLabelValues(fieldValidation)
if wd, ok := request.LatencyTrackersFrom(req.Context()); ok { if wd, ok := request.LatencyTrackersFrom(req.Context()); ok {
sliLatency := elapsedSeconds - (wd.MutatingWebhookTracker.GetLatency() + wd.ValidatingWebhookTracker.GetLatency()).Seconds() sliLatency := elapsedSeconds - (wd.MutatingWebhookTracker.GetLatency() + wd.ValidatingWebhookTracker.GetLatency() + wd.APFQueueWaitTracker.GetLatency()).Seconds()
requestSloLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sliLatency) requestSloLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sliLatency)
requestSliLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sliLatency) requestSliLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sliLatency)
} }

View File

@ -116,6 +116,10 @@ type LatencyTrackers struct {
// Validate webhooks are done in parallel, so max function is used. // Validate webhooks are done in parallel, so max function is used.
ValidatingWebhookTracker DurationTracker ValidatingWebhookTracker DurationTracker
// APFQueueWaitTracker tracks the latency incurred by queue wait times
// from priority & fairness.
APFQueueWaitTracker DurationTracker
// StorageTracker tracks the latency incurred inside the storage layer, // StorageTracker tracks the latency incurred inside the storage layer,
// it accounts for the time it takes to send data to the underlying // it accounts for the time it takes to send data to the underlying
// storage layer (etcd) and get the complete response back. // storage layer (etcd) and get the complete response back.
@ -168,6 +172,7 @@ func WithLatencyTrackersAndCustomClock(parent context.Context, c clock.Clock) co
return WithValue(parent, latencyTrackersKey, &LatencyTrackers{ return WithValue(parent, latencyTrackersKey, &LatencyTrackers{
MutatingWebhookTracker: newSumLatencyTracker(c), MutatingWebhookTracker: newSumLatencyTracker(c),
ValidatingWebhookTracker: newMaxLatencyTracker(c), ValidatingWebhookTracker: newMaxLatencyTracker(c),
APFQueueWaitTracker: newMaxLatencyTracker(c),
StorageTracker: newSumLatencyTracker(c), StorageTracker: newSumLatencyTracker(c),
TransformTracker: newSumLatencyTracker(c), TransformTracker: newSumLatencyTracker(c),
SerializationTracker: newSumLatencyTracker(c), SerializationTracker: newSumLatencyTracker(c),
@ -230,6 +235,14 @@ func TrackResponseWriteLatency(ctx context.Context, d time.Duration) {
} }
} }
// TrackAPFQueueWaitLatency is used to track latency incurred
// by priority and fairness queues.
func TrackAPFQueueWaitLatency(ctx context.Context, d time.Duration) {
if tracker, ok := LatencyTrackersFrom(ctx); ok {
tracker.APFQueueWaitTracker.TrackDuration(d)
}
}
// AuditAnnotationsFromLatencyTrackers will inspect each latency tracker // AuditAnnotationsFromLatencyTrackers will inspect each latency tracker
// associated with the request context and return a set of audit // associated with the request context and return a set of audit
// annotations that can be added to the API audit entry. // annotations that can be added to the API audit entry.

View File

@ -35,7 +35,7 @@ func TestLatencyTrackersFrom(t *testing.T) {
SumDurations: 1600, SumDurations: 1600,
MaxDuration: 400, MaxDuration: 400,
} }
t.Run("TestWebhookDurationFrom", func(t *testing.T) { t.Run("TestLatencyTrackersFrom", func(t *testing.T) {
parent := context.TODO() parent := context.TODO()
_, ok := LatencyTrackersFrom(parent) _, ok := LatencyTrackersFrom(parent)
if ok { if ok {
@ -48,13 +48,14 @@ func TestLatencyTrackersFrom(t *testing.T) {
if !ok { if !ok {
t.Error("expected LatencyTrackersFrom to be initialized") t.Error("expected LatencyTrackersFrom to be initialized")
} }
if wd.MutatingWebhookTracker.GetLatency() != 0 || wd.ValidatingWebhookTracker.GetLatency() != 0 { if wd.MutatingWebhookTracker.GetLatency() != 0 || wd.ValidatingWebhookTracker.GetLatency() != 0 || wd.APFQueueWaitTracker.GetLatency() != 0 {
t.Error("expected values to be initialized to 0") t.Error("expected values to be initialized to 0")
} }
for _, d := range tc.Durations { for _, d := range tc.Durations {
wd.MutatingWebhookTracker.Track(func() { clk.Step(d) }) wd.MutatingWebhookTracker.Track(func() { clk.Step(d) })
wd.ValidatingWebhookTracker.Track(func() { clk.Step(d) }) wd.ValidatingWebhookTracker.Track(func() { clk.Step(d) })
wd.APFQueueWaitTracker.Track(func() { clk.Step(d) })
} }
wd, ok = LatencyTrackersFrom(ctx) wd, ok = LatencyTrackersFrom(ctx)
@ -69,5 +70,9 @@ func TestLatencyTrackersFrom(t *testing.T) {
if wd.ValidatingWebhookTracker.GetLatency() != tc.MaxDuration { if wd.ValidatingWebhookTracker.GetLatency() != tc.MaxDuration {
t.Errorf("expected validate duration: %q, but got: %q", tc.MaxDuration, wd.ValidatingWebhookTracker.GetLatency()) t.Errorf("expected validate duration: %q, but got: %q", tc.MaxDuration, wd.ValidatingWebhookTracker.GetLatency())
} }
if wd.APFQueueWaitTracker.GetLatency() != tc.MaxDuration {
t.Errorf("expected priority & fairness duration: %q, but got: %q", tc.MaxDuration, wd.APFQueueWaitTracker.GetLatency())
}
}) })
} }

View File

@ -1026,7 +1026,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
noteFn(selectedFlowSchema, plState.pl, flowDistinguisher) noteFn(selectedFlowSchema, plState.pl, flowDistinguisher)
workEstimate := workEstimator() workEstimate := workEstimator()
startWaitingTime = time.Now() startWaitingTime = cfgCtlr.clock.Now()
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
if idle { if idle {

View File

@ -21,6 +21,7 @@ import (
"strconv" "strconv"
"time" "time"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/httplog" "k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/server/mux" "k8s.io/apiserver/pkg/server/mux"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
@ -161,7 +162,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
queued := startWaitingTime != time.Time{} queued := startWaitingTime != time.Time{}
if req == nil { if req == nil {
if queued { if queued {
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime))
} }
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt) klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt)
return return
@ -178,21 +179,26 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
}() }()
idle = req.Finish(func() { idle = req.Finish(func() {
if queued { if queued {
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime))
} }
metrics.AddDispatch(ctx, pl.Name, fs.Name) metrics.AddDispatch(ctx, pl.Name, fs.Name)
fqs.OnRequestDispatched(req) fqs.OnRequestDispatched(req)
executed = true executed = true
startExecutionTime := time.Now() startExecutionTime := cfgCtlr.clock.Now()
defer func() { defer func() {
executionTime := time.Since(startExecutionTime) executionTime := cfgCtlr.clock.Since(startExecutionTime)
httplog.AddKeyValue(ctx, "apf_execution_time", executionTime) httplog.AddKeyValue(ctx, "apf_execution_time", executionTime)
metrics.ObserveExecutionDuration(ctx, pl.Name, fs.Name, executionTime) metrics.ObserveExecutionDuration(ctx, pl.Name, fs.Name, executionTime)
}() }()
execFn() execFn()
}) })
if queued && !executed { if queued && !executed {
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime))
} }
panicking = false panicking = false
} }
func observeQueueWaitTime(ctx context.Context, priorityLevelName, flowSchemaName, execute string, waitTime time.Duration) {
metrics.ObserveWaitingDuration(ctx, priorityLevelName, flowSchemaName, execute, waitTime)
endpointsrequest.TrackAPFQueueWaitLatency(ctx, waitTime)
}

View File

@ -0,0 +1,167 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"context"
"testing"
"time"
flowcontrol "k8s.io/api/flowcontrol/v1beta3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/request"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
)
// TestQueueWaitTimeLatencyTracker tests the queue wait times recorded by the P&F latency tracker
// when calling Handle.
func TestQueueWaitTimeLatencyTracker(t *testing.T) {
metrics.Register()
var fsObj *flowcontrol.FlowSchema
var plcObj *flowcontrol.PriorityLevelConfiguration
cfgObjs := []runtime.Object{}
plName := "test-pl"
username := "test-user"
fsName := "test-fs"
lendable := int32(0)
borrowingLimit := int32(0)
fsObj = &flowcontrol.FlowSchema{
ObjectMeta: metav1.ObjectMeta{
Name: fsName,
},
Spec: flowcontrol.FlowSchemaSpec{
MatchingPrecedence: 100,
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
Name: plName,
},
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
Type: flowcontrol.FlowDistinguisherMethodByUserType,
},
Rules: []flowcontrol.PolicyRulesWithSubjects{{
Subjects: []flowcontrol.Subject{{
Kind: flowcontrol.SubjectKindUser,
User: &flowcontrol.UserSubject{Name: username},
}},
NonResourceRules: []flowcontrol.NonResourcePolicyRule{{
Verbs: []string{"*"},
NonResourceURLs: []string{"*"},
}},
}},
},
}
plcObj = &flowcontrol.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: plName,
},
Spec: flowcontrol.PriorityLevelConfigurationSpec{
Type: flowcontrol.PriorityLevelEnablementLimited,
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
NominalConcurrencyShares: 100,
LendablePercent: &lendable,
BorrowingLimitPercent: &borrowingLimit,
LimitResponse: flowcontrol.LimitResponse{
Type: flowcontrol.LimitResponseTypeQueue,
Queuing: &flowcontrol.QueuingConfiguration{
Queues: 10,
HandSize: 2,
QueueLengthLimit: 10,
},
},
},
},
}
cfgObjs = append(cfgObjs, fsObj, plcObj)
clientset := clientsetfake.NewSimpleClientset(cfgObjs...)
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second)
flowcontrolClient := clientset.FlowcontrolV1beta3()
startTime := time.Now()
clk, _ := eventclock.NewFake(startTime, 0, nil)
controller := newTestableController(TestableConfig{
Name: "Controller",
Clock: clk,
AsFieldManager: ConfigConsumerAsFieldManager,
FoundToDangling: func(found bool) bool { return !found },
InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 24,
RequestWaitLimit: time.Minute,
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqs.NewQueueSetFactory(clk),
})
informerFactory.Start(nil)
status := informerFactory.WaitForCacheSync(nil)
if names := unsynced(status); len(names) > 0 {
t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names)
}
go func() {
controller.Run(nil)
}()
// ensure that the controller has run its first loop.
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
return controller.hasPriorityLevelState(plcObj.Name), nil
})
if err != nil {
t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plcObj.Name, err)
}
reqInfo := &request.RequestInfo{
IsResourceRequest: false,
Path: "/foobar",
Verb: "GET",
}
noteFn := func(fs *flowcontrol.FlowSchema, plc *flowcontrol.PriorityLevelConfiguration, fd string) {}
workEstr := func() fcrequest.WorkEstimate { return fcrequest.WorkEstimate{InitialSeats: 1} }
flowUser := testUser{name: "test-user"}
rd := RequestDigest{
RequestInfo: reqInfo,
User: flowUser,
}
// Add 1 second to the fake clock during QueueNoteFn
newTime := startTime.Add(time.Second)
qnf := fq.QueueNoteFn(func(bool) { clk.FakePassiveClock.SetTime(newTime) })
ctx := request.WithLatencyTrackers(context.Background())
controller.Handle(ctx, rd, noteFn, workEstr, qnf, func() {})
latencyTracker, ok := request.LatencyTrackersFrom(ctx)
if !ok {
t.Fatalf("error getting latency tracker: %v", err)
}
expectedLatency := time.Second // newTime - startTime
latency := latencyTracker.APFQueueWaitTracker.GetLatency()
if latency != expectedLatency {
t.Errorf("unexpected latency, got %s, expected %s", latency, expectedLatency)
}
}