Extract delegator.Helper interface to allow making delegate decision based on cache state

Kubernetes-commit: 984b475e74904dd61c10b23472798a21496edc8f
This commit is contained in:
Marek Siarkowicz 2025-03-17 15:46:02 +01:00 committed by Kubernetes Publisher
parent aac1558d35
commit cc5ef43352
6 changed files with 118 additions and 99 deletions

View File

@ -48,6 +48,7 @@ import (
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/delegator"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
@ -334,9 +335,12 @@ func TestShouldDelegateList(t *testing.T) {
expectBypass = bypass
}
}
gotBypass, _ := shouldDelegateList(toStorageOpts(opt))
if gotBypass != expectBypass {
t.Errorf("Unexpected bypass result for List request with options %+v, bypass expected: %v, got: %v", opt, expectBypass, gotBypass)
result, err := shouldDelegateList(toStorageOpts(opt), delegator.CacheWithoutSnapshots{})
if err != nil {
t.Fatal(err)
}
if result.ShouldDelegate != expectBypass {
t.Errorf("Unexpected bypass result for List request with options %+v, bypass expected: %v, got: %v", opt, expectBypass, result.ShouldDelegate)
}
}
}

View File

@ -36,8 +36,8 @@ import (
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/delegator"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
@ -180,8 +180,11 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
if err != nil {
return err
}
shouldDelegate, consistentRead := shouldDelegateList(opts)
if shouldDelegate {
result, err := shouldDelegateList(opts, delegator.CacheWithoutSnapshots{})
if err != nil {
return err
}
if result.ShouldDelegate {
return c.storage.GetList(ctx, key, opts, listObj)
}
@ -203,7 +206,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
return c.storage.GetList(ctx, key, opts, listObj)
}
}
if consistentRead {
if result.ConsistentRead {
listRV, err = c.storage.GetCurrentResourceVersion(ctx)
if err != nil {
return err
@ -215,7 +218,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
success := "true"
fallback := "false"
if err != nil {
if consistentRead {
if result.ConsistentRead {
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"
// Reset resourceVersion during fallback from consistent read.
@ -229,7 +232,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
}
return err
}
if consistentRead {
if result.ConsistentRead {
metrics.ConsistentReadTotal.WithLabelValues(c.cacher.resourcePrefix, success, fallback).Add(1)
}
return nil
@ -243,36 +246,32 @@ func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
return noLabelSelector && noFieldSelector && hasLimit
}
// NOTICE: Keep in sync with shouldListFromStorage function in
// NOTICE: Keep in sync with shouldDelegateList function in
//
// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go
func shouldDelegateList(opts storage.ListOptions) (shouldDeletage, consistentRead bool) {
func shouldDelegateList(opts storage.ListOptions, cache delegator.Helper) (delegator.Result, error) {
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
consistentRead = false
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchExact:
return true, consistentRead
return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive)
case metav1.ResourceVersionMatchNotOlderThan:
return false, consistentRead
return delegator.Result{ShouldDelegate: false}, nil
case "":
// Legacy exact match
if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return true, consistentRead
return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive)
}
// Continue
if len(opts.Predicate.Continue) > 0 {
return true, consistentRead
return cache.ShouldDelegateContinue(opts.Predicate.Continue, opts.Recursive)
}
// Consistent Read
if opts.ResourceVersion == "" {
consistentRead = true
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
return !consistentListFromCacheEnabled || !requestWatchProgressSupported, consistentRead
return cache.ShouldDelegateConsistentRead()
}
return false, consistentRead
return delegator.Result{ShouldDelegate: false}, nil
default:
return true, consistentRead
return delegator.Result{ShouldDelegate: true}, nil
}
}

View File

@ -0,0 +1,73 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package delegator
import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
type Helper interface {
ShouldDelegateExactRV(rv string, recursive bool) (Result, error)
ShouldDelegateContinue(continueToken string, recursive bool) (Result, error)
ShouldDelegateConsistentRead() (Result, error)
}
// Result of delegator decision.
type Result struct {
// Whether a request cannot be served by cache and should be delegated to etcd.
ShouldDelegate bool
// Whether a request is a consistent read, used by delegator to decide if it should call GetCurrentResourceVersion to get RV.
// Included in interface as only cacher has keyPrefix needed to parse continue token.
ConsistentRead bool
}
type CacheWithoutSnapshots struct{}
var _ Helper = CacheWithoutSnapshots{}
func (c CacheWithoutSnapshots) ShouldDelegateContinue(continueToken string, recursive bool) (Result, error) {
return Result{
ShouldDelegate: true,
// Continue with negative RV is considered a consistent read, however token cannot be parsed without keyPrefix unavailable in staging/src/k8s.io/apiserver/pkg/util/flow_control/request/list_work_estimator.go.
ConsistentRead: false,
}, nil
}
func (c CacheWithoutSnapshots) ShouldDelegateExactRV(rv string, recursive bool) (Result, error) {
return Result{
ShouldDelegate: true,
ConsistentRead: false,
}, nil
}
func (c CacheWithoutSnapshots) ShouldDelegateConsistentRead() (Result, error) {
return Result{
ShouldDelegate: !ConsistentReadSupported(),
ConsistentRead: true,
}, nil
}
// ConsistentReadSupported returns whether cache can be used to serve reads with RV not yet observed by cache, including both consistent reads.
// Function is located here to avoid import cycles between staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go and staging/src/k8s.io/apiserver/pkg/util/flow_control/request/list_work_estimator.go.
func ConsistentReadSupported() bool {
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
return consistentListFromCacheEnabled && requestWatchProgressSupported
}

View File

@ -20,8 +20,6 @@ import (
"context"
"testing"
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
"k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/apis/example"
@ -194,56 +192,3 @@ func TestCalculateDigest(t *testing.T) {
})
}
}
func TestValidateUndelegatedListOptions(t *testing.T) {
opts := []storage.ListOptions{}
keyPrefix := "/pods/"
continueOnRV1, err := storage.EncodeContinue("/pods/a", keyPrefix, 1)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
continueOnNegativeRV, err := storage.EncodeContinue("/pods/a", keyPrefix, -1)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for _, rv := range []string{"", "0", "1"} {
for _, match := range []metav1.ResourceVersionMatch{"", metav1.ResourceVersionMatchExact, metav1.ResourceVersionMatchNotOlderThan} {
for _, c := range []string{"", continueOnRV1, continueOnNegativeRV} {
for _, limit := range []int64{0, 100} {
for _, recursive := range []bool{true, false} {
opt := storage.ListOptions{
ResourceVersion: rv,
ResourceVersionMatch: match,
Predicate: storage.SelectionPredicate{
Limit: limit,
Continue: c,
},
Recursive: recursive,
}
// Skip requests that will not pass apiserver validation
if errs := validation.ValidateListOptions(&internalversion.ListOptions{
ResourceVersion: opt.ResourceVersion,
ResourceVersionMatch: opt.ResourceVersionMatch,
Limit: opt.Predicate.Limit,
Continue: opt.Predicate.Continue,
}, false); len(errs) != 0 {
continue
}
// Skip requests sent directly to etcd
if delegateList, _ := shouldDelegateList(opt); delegateList {
continue
}
opts = append(opts, opt)
}
}
}
}
}
for _, opt := range opts {
_, _, err := storage.ValidateListOptions(keyPrefix, storage.APIObjectVersioner{}, opt)
if err != nil {
t.Errorf("Expected List requests %+v to pass validation, got: %v", opt, err)
}
}
}

View File

@ -32,9 +32,9 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/delegator"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
"k8s.io/apiserver/pkg/storage/cacher/progress"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing"
@ -496,8 +496,7 @@ func (s sortableStoreElements) Swap(i, j int) {
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, opts storage.ListOptions) (resp listResp, index string, err error) {
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
@ -562,8 +561,7 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool {
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
var err error
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()

View File

@ -23,10 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/storage/cacher/delegator"
"k8s.io/klog/v2"
)
@ -85,7 +82,12 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
return WorkEstimate{InitialSeats: e.config.MinimumSeats}
}
}
listFromStorage, _ := shouldListFromStorage(&listOptions)
// TODO: Check whether watchcache is enabled.
result, err := shouldDelegateList(&listOptions, delegator.CacheWithoutSnapshots{})
if err != nil {
return WorkEstimate{InitialSeats: maxSeats}
}
listFromStorage := result.ShouldDelegate
isListFromCache := requestInfo.Verb == "watch" || !listFromStorage
numStored, err := e.countGetterFn(key(requestInfo))
@ -162,32 +164,30 @@ func key(requestInfo *apirequest.RequestInfo) string {
// NOTICE: Keep in sync with shouldDelegateList function in
//
// staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go
func shouldListFromStorage(opts *metav1.ListOptions) (shouldDeletage, consistentRead bool) {
func shouldDelegateList(opts *metav1.ListOptions, cache delegator.Helper) (delegator.Result, error) {
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
consistentRead = false
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchExact:
return true, consistentRead
return cache.ShouldDelegateExactRV(opts.ResourceVersion, defaultRecursive)
case metav1.ResourceVersionMatchNotOlderThan:
return false, consistentRead
return delegator.Result{ShouldDelegate: false}, nil
case "":
// Legacy exact match
if opts.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return true, consistentRead
return cache.ShouldDelegateExactRV(opts.ResourceVersion, defaultRecursive)
}
// Continue
if len(opts.Continue) > 0 {
return true, consistentRead
return cache.ShouldDelegateContinue(opts.Continue, defaultRecursive)
}
// Consistent Read
if opts.ResourceVersion == "" {
consistentRead = true
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
return !consistentListFromCacheEnabled || !requestWatchProgressSupported, consistentRead
return cache.ShouldDelegateConsistentRead()
}
return false, consistentRead
return delegator.Result{ShouldDelegate: false}, nil
default:
return true, consistentRead
return delegator.Result{ShouldDelegate: true}, nil
}
}
var defaultRecursive = true