From 6811fdeb2d0da59c6b8d8b1ed9f9044b4944c36f Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 20 Mar 2025 11:08:37 +0100 Subject: [PATCH] Unify should delegate list Kubernetes-commit: 8fdd6fe4768d81da09f339c1dda831a05bb26f00 --- pkg/storage/cacher/cacher_whitebox_test.go | 3 +- pkg/storage/cacher/delegator.go | 31 +------------- pkg/storage/cacher/delegator/interface.go | 40 +++++++++++++++++++ .../request/list_work_estimator.go | 33 +-------------- 4 files changed, 44 insertions(+), 63 deletions(-) diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 6d6d5f8d1..f70cb7aa0 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -48,6 +48,7 @@ import ( examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/cacher/delegator" "k8s.io/apiserver/pkg/storage/cacher/metrics" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" etcdfeature "k8s.io/apiserver/pkg/storage/feature" @@ -365,7 +366,7 @@ func TestShouldDelegateList(t *testing.T) { if snapshotAvailable { cacher.watchCache.snapshots.Add(uint64(mustAtoi(oldRV)), fakeOrderedLister{}) } - result, err := shouldDelegateList(toStorageOpts(opt), cacher) + result, err := delegator.ShouldDelegateList(toStorageOpts(opt), cacher) if err != nil { t.Fatal(err) } diff --git a/pkg/storage/cacher/delegator.go b/pkg/storage/cacher/delegator.go index 501190dcc..ac17fb1c8 100644 --- a/pkg/storage/cacher/delegator.go +++ b/pkg/storage/cacher/delegator.go @@ -180,7 +180,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L if err != nil { return err } - result, err := shouldDelegateList(opts, c.cacher) + result, err := delegator.ShouldDelegateList(opts, c.cacher) if err != nil { return err } @@ -249,35 +249,6 @@ func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool { return noLabelSelector && noFieldSelector && hasLimit } -// NOTICE: Keep in sync with shouldDelegateList function in -// -// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go -func shouldDelegateList(opts storage.ListOptions, cache delegator.Helper) (delegator.Result, error) { - // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list - switch opts.ResourceVersionMatch { - case metav1.ResourceVersionMatchExact: - return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive) - case metav1.ResourceVersionMatchNotOlderThan: - return delegator.Result{ShouldDelegate: false}, nil - case "": - // Continue - if len(opts.Predicate.Continue) > 0 { - return cache.ShouldDelegateContinue(opts.Predicate.Continue, opts.Recursive) - } - // Legacy exact match - if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" { - return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive) - } - // Consistent Read - if opts.ResourceVersion == "" { - return cache.ShouldDelegateConsistentRead() - } - return delegator.Result{ShouldDelegate: false}, nil - default: - return delegator.Result{ShouldDelegate: true}, nil - } -} - func (c *CacheDelegator) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error { // Ignore the suggestion and try to pass down the current version of the object // read from cache. diff --git a/pkg/storage/cacher/delegator/interface.go b/pkg/storage/cacher/delegator/interface.go index fa56004e1..1cdf5145e 100644 --- a/pkg/storage/cacher/delegator/interface.go +++ b/pkg/storage/cacher/delegator/interface.go @@ -17,12 +17,52 @@ limitations under the License. package delegator import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" etcdfeature "k8s.io/apiserver/pkg/storage/feature" utilfeature "k8s.io/apiserver/pkg/util/feature" ) +func ShouldDelegateListMeta(opts *metav1.ListOptions, cache Helper) (Result, error) { + return ShouldDelegateList( + storage.ListOptions{ + ResourceVersionMatch: opts.ResourceVersionMatch, + ResourceVersion: opts.ResourceVersion, + Predicate: storage.SelectionPredicate{ + Continue: opts.Continue, + Limit: opts.Limit, + }, + Recursive: true, + }, cache) +} + +func ShouldDelegateList(opts storage.ListOptions, cache Helper) (Result, error) { + // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list + switch opts.ResourceVersionMatch { + case metav1.ResourceVersionMatchExact: + return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive) + case metav1.ResourceVersionMatchNotOlderThan: + return Result{ShouldDelegate: false}, nil + case "": + // Continue + if len(opts.Predicate.Continue) > 0 { + return cache.ShouldDelegateContinue(opts.Predicate.Continue, opts.Recursive) + } + // Legacy exact match + if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" { + return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive) + } + // Consistent Read + if opts.ResourceVersion == "" { + return cache.ShouldDelegateConsistentRead() + } + return Result{ShouldDelegate: false}, nil + default: + return Result{ShouldDelegate: true}, nil + } +} + type Helper interface { ShouldDelegateExactRV(rv string, recursive bool) (Result, error) ShouldDelegateContinue(continueToken string, recursive bool) (Result, error) diff --git a/pkg/util/flowcontrol/request/list_work_estimator.go b/pkg/util/flowcontrol/request/list_work_estimator.go index bc79aac32..6e46e2d59 100644 --- a/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/pkg/util/flowcontrol/request/list_work_estimator.go @@ -83,7 +83,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe } } // TODO: Check whether watchcache is enabled. - result, err := shouldDelegateList(&listOptions, delegator.CacheWithoutSnapshots{}) + result, err := delegator.ShouldDelegateListMeta(&listOptions, delegator.CacheWithoutSnapshots{}) if err != nil { return WorkEstimate{InitialSeats: maxSeats} } @@ -160,34 +160,3 @@ func key(requestInfo *apirequest.RequestInfo) string { } return groupResource.String() } - -// NOTICE: Keep in sync with shouldDelegateList function in -// -// staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go -func shouldDelegateList(opts *metav1.ListOptions, cache delegator.Helper) (delegator.Result, error) { - // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list - switch opts.ResourceVersionMatch { - case metav1.ResourceVersionMatchExact: - return cache.ShouldDelegateExactRV(opts.ResourceVersion, defaultRecursive) - case metav1.ResourceVersionMatchNotOlderThan: - return delegator.Result{ShouldDelegate: false}, nil - case "": - // Continue - if len(opts.Continue) > 0 { - return cache.ShouldDelegateContinue(opts.Continue, defaultRecursive) - } - // Legacy exact match - if opts.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" { - return cache.ShouldDelegateExactRV(opts.ResourceVersion, defaultRecursive) - } - // Consistent Read - if opts.ResourceVersion == "" { - return cache.ShouldDelegateConsistentRead() - } - return delegator.Result{ShouldDelegate: false}, nil - default: - return delegator.Result{ShouldDelegate: true}, nil - } -} - -var defaultRecursive = true