diff --git a/go.mod b/go.mod index 61b536c81..e11073405 100644 --- a/go.mod +++ b/go.mod @@ -42,9 +42,9 @@ require ( google.golang.org/protobuf v1.28.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/square/go-jose.v2 v2.6.0 - k8s.io/api v0.0.0-20230315055824-5ab285d38c19 + k8s.io/api v0.0.0-20230314091508-112a65bae227 k8s.io/apimachinery v0.0.0-20230314010357-128166500c57 - k8s.io/client-go v0.0.0-20230315061805-362ffe4fcd37 + k8s.io/client-go v0.0.0-20230315061813-3cafc13f5d42 k8s.io/component-base v0.0.0-20230313212246-ce16dede9c0e k8s.io/klog/v2 v2.90.1 k8s.io/kms v0.0.0-20230313212457-12714b59d299 @@ -124,9 +124,9 @@ require ( ) replace ( - k8s.io/api => k8s.io/api v0.0.0-20230315055824-5ab285d38c19 + k8s.io/api => k8s.io/api v0.0.0-20230314091508-112a65bae227 k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230314010357-128166500c57 - k8s.io/client-go => k8s.io/client-go v0.0.0-20230315061805-362ffe4fcd37 + k8s.io/client-go => k8s.io/client-go v0.0.0-20230315061813-3cafc13f5d42 k8s.io/component-base => k8s.io/component-base v0.0.0-20230313212246-ce16dede9c0e k8s.io/kms => k8s.io/kms v0.0.0-20230313212457-12714b59d299 ) diff --git a/go.sum b/go.sum index 2e0f53a52..a11f56692 100644 --- a/go.sum +++ b/go.sum @@ -874,12 +874,12 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -k8s.io/api v0.0.0-20230315055824-5ab285d38c19 h1:+y54q1Icmf7JV4NpvSmWdb7qUqfJut+VMYn6ieej5a8= -k8s.io/api v0.0.0-20230315055824-5ab285d38c19/go.mod h1:YsFNxBfPYQZAIBg0XvL94rxDDVZvQQQUlo4KbwEEqNU= +k8s.io/api v0.0.0-20230314091508-112a65bae227 h1:Ak4YrHI4101ZMfx2hkXnp//d5r0nKXA8RkNaHCBJ7MA= +k8s.io/api v0.0.0-20230314091508-112a65bae227/go.mod h1:YsFNxBfPYQZAIBg0XvL94rxDDVZvQQQUlo4KbwEEqNU= k8s.io/apimachinery v0.0.0-20230314010357-128166500c57 h1:Vr1geeI+at1NNCWyTN70NtPSNcveZ+fAcbZivzwHknM= k8s.io/apimachinery v0.0.0-20230314010357-128166500c57/go.mod h1:1AlvkfXatlv5Kq9dCZg3Ksdu/DyrZ31Q0CncRqQ8Q9I= -k8s.io/client-go v0.0.0-20230315061805-362ffe4fcd37 h1:P92qDx0WAKPH7Bnaxlj2m/gQR3OehZ6oF0vfUTxVc90= -k8s.io/client-go v0.0.0-20230315061805-362ffe4fcd37/go.mod h1:S0HXdZfM0Ydi8yxoINLXkc3OT+Zx3YntXTvMq5xdNdo= +k8s.io/client-go v0.0.0-20230315061813-3cafc13f5d42 h1:b7qY4Bq8gHunC2mYT/RAtMWDgMmkMb16oQuktJkHpHI= +k8s.io/client-go v0.0.0-20230315061813-3cafc13f5d42/go.mod h1:oWqDJxiXYcSV7S8woQ4H5epopeK85SJyOu5lJsMndcU= k8s.io/component-base v0.0.0-20230313212246-ce16dede9c0e h1:/mftAl/78q8dPZU8YkshNzt1XpbEJTGsxYZP/WGI4og= k8s.io/component-base v0.0.0-20230313212246-ce16dede9c0e/go.mod h1:MQKfc/tXtS50042g1VxMb2W2E8PCt97xO8RsLTw3AeI= k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= diff --git a/pkg/endpoints/metrics/metrics.go b/pkg/endpoints/metrics/metrics.go index 61f53c18e..450a6653d 100644 --- a/pkg/endpoints/metrics/metrics.go +++ b/pkg/endpoints/metrics/metrics.go @@ -106,7 +106,7 @@ var ( &compbasemetrics.HistogramOpts{ Subsystem: APIServerComponent, Name: "request_slo_duration_seconds", - Help: "Response latency distribution (not counting webhook duration) in seconds for each verb, group, version, resource, subresource, scope and component.", + Help: "Response latency distribution (not counting webhook duration and priority & fairness queue wait times) in seconds for each verb, group, version, resource, subresource, scope and component.", // This metric is supplementary to the requestLatencies metric. // It measures request duration excluding webhooks as they are mostly // dependant on user configuration. @@ -121,7 +121,7 @@ var ( &compbasemetrics.HistogramOpts{ Subsystem: APIServerComponent, Name: "request_sli_duration_seconds", - Help: "Response latency distribution (not counting webhook duration) in seconds for each verb, group, version, resource, subresource, scope and component.", + Help: "Response latency distribution (not counting webhook duration and priority & fairness queue wait times) in seconds for each verb, group, version, resource, subresource, scope and component.", // This metric is supplementary to the requestLatencies metric. // It measures request duration excluding webhooks as they are mostly // dependant on user configuration. @@ -544,7 +544,7 @@ func MonitorRequest(req *http.Request, verb, group, version, resource, subresour fieldValidationRequestLatencies.WithContext(req.Context()).WithLabelValues(fieldValidation) if wd, ok := request.LatencyTrackersFrom(req.Context()); ok { - sliLatency := elapsedSeconds - (wd.MutatingWebhookTracker.GetLatency() + wd.ValidatingWebhookTracker.GetLatency()).Seconds() + sliLatency := elapsedSeconds - (wd.MutatingWebhookTracker.GetLatency() + wd.ValidatingWebhookTracker.GetLatency() + wd.APFQueueWaitTracker.GetLatency()).Seconds() requestSloLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sliLatency) requestSliLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sliLatency) } diff --git a/pkg/endpoints/request/webhook_duration.go b/pkg/endpoints/request/webhook_duration.go index 120bc46bf..612c671d8 100644 --- a/pkg/endpoints/request/webhook_duration.go +++ b/pkg/endpoints/request/webhook_duration.go @@ -116,6 +116,10 @@ type LatencyTrackers struct { // Validate webhooks are done in parallel, so max function is used. ValidatingWebhookTracker DurationTracker + // APFQueueWaitTracker tracks the latency incurred by queue wait times + // from priority & fairness. + APFQueueWaitTracker DurationTracker + // StorageTracker tracks the latency incurred inside the storage layer, // it accounts for the time it takes to send data to the underlying // storage layer (etcd) and get the complete response back. @@ -168,6 +172,7 @@ func WithLatencyTrackersAndCustomClock(parent context.Context, c clock.Clock) co return WithValue(parent, latencyTrackersKey, &LatencyTrackers{ MutatingWebhookTracker: newSumLatencyTracker(c), ValidatingWebhookTracker: newMaxLatencyTracker(c), + APFQueueWaitTracker: newMaxLatencyTracker(c), StorageTracker: newSumLatencyTracker(c), TransformTracker: newSumLatencyTracker(c), SerializationTracker: newSumLatencyTracker(c), @@ -230,6 +235,14 @@ func TrackResponseWriteLatency(ctx context.Context, d time.Duration) { } } +// TrackAPFQueueWaitLatency is used to track latency incurred +// by priority and fairness queues. +func TrackAPFQueueWaitLatency(ctx context.Context, d time.Duration) { + if tracker, ok := LatencyTrackersFrom(ctx); ok { + tracker.APFQueueWaitTracker.TrackDuration(d) + } +} + // AuditAnnotationsFromLatencyTrackers will inspect each latency tracker // associated with the request context and return a set of audit // annotations that can be added to the API audit entry. diff --git a/pkg/endpoints/request/webhook_duration_test.go b/pkg/endpoints/request/webhook_duration_test.go index 9119047c1..0008b21cc 100644 --- a/pkg/endpoints/request/webhook_duration_test.go +++ b/pkg/endpoints/request/webhook_duration_test.go @@ -35,7 +35,7 @@ func TestLatencyTrackersFrom(t *testing.T) { SumDurations: 1600, MaxDuration: 400, } - t.Run("TestWebhookDurationFrom", func(t *testing.T) { + t.Run("TestLatencyTrackersFrom", func(t *testing.T) { parent := context.TODO() _, ok := LatencyTrackersFrom(parent) if ok { @@ -48,13 +48,14 @@ func TestLatencyTrackersFrom(t *testing.T) { if !ok { t.Error("expected LatencyTrackersFrom to be initialized") } - if wd.MutatingWebhookTracker.GetLatency() != 0 || wd.ValidatingWebhookTracker.GetLatency() != 0 { + if wd.MutatingWebhookTracker.GetLatency() != 0 || wd.ValidatingWebhookTracker.GetLatency() != 0 || wd.APFQueueWaitTracker.GetLatency() != 0 { t.Error("expected values to be initialized to 0") } for _, d := range tc.Durations { wd.MutatingWebhookTracker.Track(func() { clk.Step(d) }) wd.ValidatingWebhookTracker.Track(func() { clk.Step(d) }) + wd.APFQueueWaitTracker.Track(func() { clk.Step(d) }) } wd, ok = LatencyTrackersFrom(ctx) @@ -69,5 +70,9 @@ func TestLatencyTrackersFrom(t *testing.T) { if wd.ValidatingWebhookTracker.GetLatency() != tc.MaxDuration { t.Errorf("expected validate duration: %q, but got: %q", tc.MaxDuration, wd.ValidatingWebhookTracker.GetLatency()) } + + if wd.APFQueueWaitTracker.GetLatency() != tc.MaxDuration { + t.Errorf("expected priority & fairness duration: %q, but got: %q", tc.MaxDuration, wd.APFQueueWaitTracker.GetLatency()) + } }) } diff --git a/pkg/util/flowcontrol/apf_controller.go b/pkg/util/flowcontrol/apf_controller.go index 5718dd9e8..2048a6ef6 100644 --- a/pkg/util/flowcontrol/apf_controller.go +++ b/pkg/util/flowcontrol/apf_controller.go @@ -1026,7 +1026,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig noteFn(selectedFlowSchema, plState.pl, flowDistinguisher) workEstimate := workEstimator() - startWaitingTime = time.Now() + startWaitingTime = cfgCtlr.clock.Now() klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) if idle { diff --git a/pkg/util/flowcontrol/apf_filter.go b/pkg/util/flowcontrol/apf_filter.go index f93e6a828..2929048ec 100644 --- a/pkg/util/flowcontrol/apf_filter.go +++ b/pkg/util/flowcontrol/apf_filter.go @@ -21,6 +21,7 @@ import ( "strconv" "time" + endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server/httplog" "k8s.io/apiserver/pkg/server/mux" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" @@ -161,7 +162,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque queued := startWaitingTime != time.Time{} if req == nil { if queued { - metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) + observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime)) } klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt) return @@ -178,21 +179,26 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque }() idle = req.Finish(func() { if queued { - metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) + observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime)) } metrics.AddDispatch(ctx, pl.Name, fs.Name) fqs.OnRequestDispatched(req) executed = true - startExecutionTime := time.Now() + startExecutionTime := cfgCtlr.clock.Now() defer func() { - executionTime := time.Since(startExecutionTime) + executionTime := cfgCtlr.clock.Since(startExecutionTime) httplog.AddKeyValue(ctx, "apf_execution_time", executionTime) metrics.ObserveExecutionDuration(ctx, pl.Name, fs.Name, executionTime) }() execFn() }) if queued && !executed { - metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) + observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime)) } panicking = false } + +func observeQueueWaitTime(ctx context.Context, priorityLevelName, flowSchemaName, execute string, waitTime time.Duration) { + metrics.ObserveWaitingDuration(ctx, priorityLevelName, flowSchemaName, execute, waitTime) + endpointsrequest.TrackAPFQueueWaitLatency(ctx, waitTime) +} diff --git a/pkg/util/flowcontrol/apf_filter_test.go b/pkg/util/flowcontrol/apf_filter_test.go new file mode 100644 index 000000000..742ada095 --- /dev/null +++ b/pkg/util/flowcontrol/apf_filter_test.go @@ -0,0 +1,167 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "context" + "testing" + "time" + + flowcontrol "k8s.io/api/flowcontrol/v1beta3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/endpoints/request" + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" + "k8s.io/client-go/informers" + clientsetfake "k8s.io/client-go/kubernetes/fake" +) + +// TestQueueWaitTimeLatencyTracker tests the queue wait times recorded by the P&F latency tracker +// when calling Handle. +func TestQueueWaitTimeLatencyTracker(t *testing.T) { + metrics.Register() + + var fsObj *flowcontrol.FlowSchema + var plcObj *flowcontrol.PriorityLevelConfiguration + cfgObjs := []runtime.Object{} + + plName := "test-pl" + username := "test-user" + fsName := "test-fs" + lendable := int32(0) + borrowingLimit := int32(0) + fsObj = &flowcontrol.FlowSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: fsName, + }, + Spec: flowcontrol.FlowSchemaSpec{ + MatchingPrecedence: 100, + PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{ + Name: plName, + }, + DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{ + Type: flowcontrol.FlowDistinguisherMethodByUserType, + }, + Rules: []flowcontrol.PolicyRulesWithSubjects{{ + Subjects: []flowcontrol.Subject{{ + Kind: flowcontrol.SubjectKindUser, + User: &flowcontrol.UserSubject{Name: username}, + }}, + NonResourceRules: []flowcontrol.NonResourcePolicyRule{{ + Verbs: []string{"*"}, + NonResourceURLs: []string{"*"}, + }}, + }}, + }, + } + plcObj = &flowcontrol.PriorityLevelConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: plName, + }, + Spec: flowcontrol.PriorityLevelConfigurationSpec{ + Type: flowcontrol.PriorityLevelEnablementLimited, + Limited: &flowcontrol.LimitedPriorityLevelConfiguration{ + NominalConcurrencyShares: 100, + LendablePercent: &lendable, + BorrowingLimitPercent: &borrowingLimit, + LimitResponse: flowcontrol.LimitResponse{ + Type: flowcontrol.LimitResponseTypeQueue, + Queuing: &flowcontrol.QueuingConfiguration{ + Queues: 10, + HandSize: 2, + QueueLengthLimit: 10, + }, + }, + }, + }, + } + cfgObjs = append(cfgObjs, fsObj, plcObj) + + clientset := clientsetfake.NewSimpleClientset(cfgObjs...) + informerFactory := informers.NewSharedInformerFactory(clientset, time.Second) + flowcontrolClient := clientset.FlowcontrolV1beta3() + startTime := time.Now() + clk, _ := eventclock.NewFake(startTime, 0, nil) + controller := newTestableController(TestableConfig{ + Name: "Controller", + Clock: clk, + AsFieldManager: ConfigConsumerAsFieldManager, + FoundToDangling: func(found bool) bool { return !found }, + InformerFactory: informerFactory, + FlowcontrolClient: flowcontrolClient, + ServerConcurrencyLimit: 24, + RequestWaitLimit: time.Minute, + ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, + ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, + QueueSetFactory: fqs.NewQueueSetFactory(clk), + }) + + informerFactory.Start(nil) + + status := informerFactory.WaitForCacheSync(nil) + if names := unsynced(status); len(names) > 0 { + t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names) + } + + go func() { + controller.Run(nil) + }() + + // ensure that the controller has run its first loop. + err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { + return controller.hasPriorityLevelState(plcObj.Name), nil + }) + if err != nil { + t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plcObj.Name, err) + } + + reqInfo := &request.RequestInfo{ + IsResourceRequest: false, + Path: "/foobar", + Verb: "GET", + } + noteFn := func(fs *flowcontrol.FlowSchema, plc *flowcontrol.PriorityLevelConfiguration, fd string) {} + workEstr := func() fcrequest.WorkEstimate { return fcrequest.WorkEstimate{InitialSeats: 1} } + + flowUser := testUser{name: "test-user"} + rd := RequestDigest{ + RequestInfo: reqInfo, + User: flowUser, + } + + // Add 1 second to the fake clock during QueueNoteFn + newTime := startTime.Add(time.Second) + qnf := fq.QueueNoteFn(func(bool) { clk.FakePassiveClock.SetTime(newTime) }) + ctx := request.WithLatencyTrackers(context.Background()) + controller.Handle(ctx, rd, noteFn, workEstr, qnf, func() {}) + + latencyTracker, ok := request.LatencyTrackersFrom(ctx) + if !ok { + t.Fatalf("error getting latency tracker: %v", err) + } + + expectedLatency := time.Second // newTime - startTime + latency := latencyTracker.APFQueueWaitTracker.GetLatency() + if latency != expectedLatency { + t.Errorf("unexpected latency, got %s, expected %s", latency, expectedLatency) + } +}