Serve LISTs with exact RV and continuations from cache

Kubernetes-commit: f82c9e56d928d1028d4b298578f275a2e5e69490
This commit is contained in:
Marek Siarkowicz 2024-07-03 21:36:51 +02:00 committed by Kubernetes Publisher
parent 3a206e1457
commit 5a059075db
7 changed files with 287 additions and 29 deletions

View File

@ -1328,15 +1328,52 @@ func newErrWatcher(err error) *errWatcher {
}
func (c *Cacher) ShouldDelegateExactRV(resourceVersion string, recursive bool) (delegator.Result, error) {
return delegator.CacheWithoutSnapshots{}.ShouldDelegateExactRV(resourceVersion, recursive)
// Not Recursive is not supported unitl exact RV is implemented for WaitUntilFreshAndGet.
if !recursive || c.watchCache.snapshots == nil {
return delegator.Result{ShouldDelegate: true}, nil
}
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return delegator.Result{}, err
}
return c.shouldDelegateExactRV(listRV)
}
func (c *Cacher) ShouldDelegateContinue(continueToken string, recursive bool) (delegator.Result, error) {
return delegator.CacheWithoutSnapshots{}.ShouldDelegateContinue(continueToken, recursive)
// Not Recursive is not supported unitl exact RV is implemented for WaitUntilFreshAndGet.
if !recursive || c.watchCache.snapshots == nil {
return delegator.Result{ShouldDelegate: true}, nil
}
_, continueRV, err := storage.DecodeContinue(continueToken, c.resourcePrefix)
if err != nil {
return delegator.Result{}, err
}
if continueRV > 0 {
return c.shouldDelegateExactRV(uint64(continueRV))
} else {
// Continue with negative RV is a consistent read.
return c.ShouldDelegateConsistentRead()
}
}
func (c *Cacher) shouldDelegateExactRV(rv uint64) (delegator.Result, error) {
// Exact requests on future revision require support for consistent read, but are not a consistent read by themselves.
if c.watchCache.notFresh(rv) {
return delegator.Result{
ShouldDelegate: !delegator.ConsistentReadSupported(),
}, nil
}
_, canServe := c.watchCache.snapshots.GetLessOrEqual(rv)
return delegator.Result{
ShouldDelegate: !canServe,
}, nil
}
func (c *Cacher) ShouldDelegateConsistentRead() (delegator.Result, error) {
return delegator.CacheWithoutSnapshots{}.ShouldDelegateConsistentRead()
return delegator.Result{
ConsistentRead: true,
ShouldDelegate: !delegator.ConsistentReadSupported(),
}, nil
}
// Implements watch.Interface.

View File

@ -179,20 +179,30 @@ func TestListPaging(t *testing.T) {
func TestList(t *testing.T) {
for _, consistentRead := range []bool{true, false} {
t.Run(fmt.Sprintf("ConsistentListFromCache=%v", consistentRead), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true)
for _, listFromCacheSnapshot := range []bool{true, false} {
t.Run(fmt.Sprintf("ListFromCacheSnapsthot=%v", listFromCacheSnapshot), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, listFromCacheSnapshot)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true)
})
}
})
}
}
func TestConsistentList(t *testing.T) {
for _, consistentRead := range []bool{true, false} {
t.Run(fmt.Sprintf("ConsistentListFromCache=%v", consistentRead), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestConsistentList(ctx, t, cacher, increaseRV(server.V3Client.Client), true, consistentRead)
for _, listFromCacheSnapshot := range []bool{true, false} {
t.Run(fmt.Sprintf("ListFromCacheSnapsthot=%v", listFromCacheSnapshot), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, listFromCacheSnapshot)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestConsistentList(ctx, t, cacher, increaseRV(server.V3Client.Client), true, consistentRead, listFromCacheSnapshot)
})
}
})
}
}
@ -200,10 +210,15 @@ func TestConsistentList(t *testing.T) {
func TestGetListNonRecursive(t *testing.T) {
for _, consistentRead := range []bool{true, false} {
t.Run(fmt.Sprintf("ConsistentListFromCache=%v", consistentRead), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(server.V3Client.Client), cacher)
for _, listFromCacheSnapshot := range []bool{true, false} {
t.Run(fmt.Sprintf("ListFromCacheSnapsthot=%v", listFromCacheSnapshot), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, listFromCacheSnapshot)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(server.V3Client.Client), cacher)
})
}
})
}
}

View File

@ -344,7 +344,7 @@ func TestShouldDelegateList(t *testing.T) {
}
}
runTestCases := func(t *testing.T, testcases map[opts]bool, overrides ...map[opts]bool) {
runTestCases := func(t *testing.T, snapshotAvailable bool, testcases map[opts]bool, overrides ...map[opts]bool) {
for opt, expectBypass := range testCases {
for _, override := range overrides {
if bypass, ok := override[opt]; ok {
@ -362,6 +362,9 @@ func TestShouldDelegateList(t *testing.T) {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
if snapshotAvailable {
cacher.watchCache.snapshots.Add(uint64(mustAtoi(oldRV)), fakeOrderedLister{})
}
result, err := shouldDelegateList(toStorageOpts(opt), cacher)
if err != nil {
t.Fatal(err)
@ -371,19 +374,46 @@ func TestShouldDelegateList(t *testing.T) {
}
}
}
consistentListFromCacheOverrides := map[opts]bool{}
for _, recursive := range []bool{true, false} {
consistentListFromCacheOverrides[opts{Recursive: recursive}] = false
consistentListFromCacheOverrides[opts{Limit: 100, Recursive: recursive}] = false
}
// Exacts and continue on current cache RV.
listFromSnapshotEnabledOverrides := map[opts]bool{}
listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: cacheRV, Limit: 100}] = false
listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: cacheRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = false
listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: cacheRV, Limit: 100, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = false
listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: "0", Limit: 100, Continue: continueOnCacheRV}] = false
listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: "0", Continue: continueOnCacheRV}] = false
listFromSnapshotEnabledOverrides[opts{Recursive: true, Limit: 100, Continue: continueOnCacheRV}] = false
listFromSnapshotEnabledOverrides[opts{Recursive: true, Continue: continueOnCacheRV}] = false
// Exacts and continue RV with a snapshot.
snapshotAvailableOverrides := map[opts]bool{}
snapshotAvailableOverrides[opts{Recursive: true, Continue: continueOnOldRV}] = false
snapshotAvailableOverrides[opts{Recursive: true, Limit: 100, Continue: continueOnOldRV}] = false
snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: "0", Continue: continueOnOldRV}] = false
snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: "0", Limit: 100, Continue: continueOnOldRV}] = false
snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: oldRV, Limit: 100}] = false
snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: oldRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = false
snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: oldRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact, Limit: 100}] = false
t.Run("ConsistentListFromCache=false", func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
runTestCases(t, testCases)
t.Run("ListFromCacheSnapshot=false", func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, false)
runTestCases(t, false, testCases)
})
t.Run("ListFromCacheSnapshot=true", func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true)
t.Run("SnapshotAvailable=false", func(t *testing.T) {
runTestCases(t, false, testCases, listFromSnapshotEnabledOverrides)
})
t.Run("SnapshotAvailable=true", func(t *testing.T) {
runTestCases(t, true, testCases, listFromSnapshotEnabledOverrides, snapshotAvailableOverrides)
})
})
})
t.Run("ConsistentListFromCache=true", func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
// TODO(p0lyn0mial): the following tests assume that etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
// evaluates to true. Otherwise the cache will be bypassed and the test will fail.
//
@ -391,7 +421,41 @@ func TestShouldDelegateList(t *testing.T) {
// However in CI all test are run and there must be a test(s) that properly
// initialize the storage layer so that the mentioned method evaluates to true
forceRequestWatchProgressSupport(t)
runTestCases(t, testCases, consistentListFromCacheOverrides)
consistentListFromCacheOverrides := map[opts]bool{}
for _, recursive := range []bool{true, false} {
consistentListFromCacheOverrides[opts{Recursive: recursive}] = false
consistentListFromCacheOverrides[opts{Limit: 100, Recursive: recursive}] = false
}
t.Run("ListFromCacheSnapshot=false", func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, false)
runTestCases(t, false, testCases, consistentListFromCacheOverrides)
})
t.Run("ListFromCacheSnapshot=true", func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true)
consistentReadWithSnapshotOverrides := map[opts]bool{}
// Continues with negative RV are same as consistent read.
consistentReadWithSnapshotOverrides[opts{Recursive: true, Limit: 0, Continue: continueOnNegativeRV}] = false
consistentReadWithSnapshotOverrides[opts{Recursive: true, Limit: 100, Continue: continueOnNegativeRV}] = false
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: "0", Limit: 0, Continue: continueOnNegativeRV}] = false
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: "0", Limit: 100, Continue: continueOnNegativeRV}] = false
// Exact on RV not yet observed by cache
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: etcdRV, Limit: 100}] = false
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: etcdRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = false
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: etcdRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact, Limit: 100}] = false
consistentReadWithSnapshotOverrides[opts{Recursive: true, Continue: continueOnEtcdRV}] = false
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: "0", Continue: continueOnEtcdRV}] = false
consistentReadWithSnapshotOverrides[opts{Recursive: true, Continue: continueOnEtcdRV, Limit: 100}] = false
consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: "0", Continue: continueOnEtcdRV, Limit: 100}] = false
t.Run("SnapshotAvailable=false", func(t *testing.T) {
runTestCases(t, false, testCases, listFromSnapshotEnabledOverrides, consistentListFromCacheOverrides, consistentReadWithSnapshotOverrides)
})
t.Run("SnapshotAvailable=true", func(t *testing.T) {
runTestCases(t, true, testCases, listFromSnapshotEnabledOverrides, consistentListFromCacheOverrides, consistentReadWithSnapshotOverrides, snapshotAvailableOverrides)
})
})
})
}
@ -568,6 +632,91 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
}
}
func TestMatchExactResourceVersionFallback(t *testing.T) {
tcs := []struct {
name string
snapshotsAvailable []bool
expectStoreRequests int
expectSnapshotRequests int
}{
{
name: "Disabled",
snapshotsAvailable: []bool{false, false},
expectStoreRequests: 2,
expectSnapshotRequests: 1,
},
{
name: "Enabled",
snapshotsAvailable: []bool{true, true},
expectStoreRequests: 1,
expectSnapshotRequests: 2,
},
{
name: "Fallback",
snapshotsAvailable: []bool{true, false},
expectSnapshotRequests: 2,
expectStoreRequests: 2,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
backingStorage := &dummyStorage{}
expectStoreRequests := 0
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
expectStoreRequests++
podList := listObj.(*example.PodList)
switch opts.ResourceVersionMatch {
case "":
podList.ResourceVersion = "42"
case metav1.ResourceVersionMatchExact:
podList.ResourceVersion = opts.ResourceVersion
}
return nil
}
cacher, _, err := newTestCacherWithoutSyncing(backingStorage, clock.RealClock{})
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
snapshotRequestCount := 0
cacher.watchCache.RWMutex.Lock()
cacher.watchCache.snapshots = &fakeSnapshotter{
getLessOrEqual: func(rv uint64) (orderedLister, bool) {
snapshotAvailable := tc.snapshotsAvailable[snapshotRequestCount]
snapshotRequestCount++
if snapshotAvailable {
return fakeOrderedLister{}, true
} else {
return nil, false
}
},
}
cacher.watchCache.RWMutex.Unlock()
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
delegator := NewCacheDelegator(cacher, backingStorage)
result := &example.PodList{}
err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "20", ResourceVersionMatch: metav1.ResourceVersionMatchExact, Recursive: true}, result)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if result.ResourceVersion != "20" {
t.Fatalf("Unexpected List response RV, got: %q, want: %d", result.ResourceVersion, 20)
}
if expectStoreRequests != tc.expectStoreRequests {
t.Fatalf("Unexpected number of requests to storage, got: %d, want: %d", expectStoreRequests, tc.expectStoreRequests)
}
if snapshotRequestCount != tc.expectSnapshotRequests {
t.Fatalf("Unexpected number of requests to snapshots, got: %d, want: %d", snapshotRequestCount, tc.expectSnapshotRequests)
}
})
}
}
func TestGetListNonRecursiveCacheBypass(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
backingStorage := &dummyStorage{}

View File

@ -218,6 +218,9 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
success := "true"
fallback := "false"
if err != nil {
if errors.IsResourceExpired(err) {
return c.storage.GetList(ctx, key, opts, listObj)
}
if result.ConsistentRead {
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"

View File

@ -25,6 +25,7 @@ import (
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@ -508,11 +509,61 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
if err != nil {
return listResp{}, "", err
}
return w.list(ctx, resourceVersion, key, opts)
}
// NOTICE: Structure follows the shouldDelegateList function in
// staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go
func (w *watchCache) list(ctx context.Context, resourceVersion uint64, key string, opts storage.ListOptions) (resp listResp, index string, err error) {
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchExact:
return w.listExactRV(key, "", resourceVersion)
case metav1.ResourceVersionMatchNotOlderThan:
case "":
// Continue
if len(opts.Predicate.Continue) > 0 {
continueKey, continueRV, err := storage.DecodeContinue(opts.Predicate.Continue, key)
if err != nil {
return listResp{}, "", errors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
if continueRV > 0 {
return w.listExactRV(key, continueKey, uint64(continueRV))
} else {
// Continue with negative RV is a consistent read - already handled via waitUntilFreshAndBlock.
// Don't pass matchValues as they don't support continueKey
return w.listLatestRV(key, continueKey, nil)
}
}
// Legacy exact match
if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return w.listExactRV(key, "", resourceVersion)
}
// Consistent Read - already handled via waitUntilFreshAndBlock
}
return w.listLatestRV(key, "", opts.Predicate.MatcherIndex(ctx))
}
func (w *watchCache) listExactRV(key, continueKey string, resourceVersion uint64) (resp listResp, index string, err error) {
if w.snapshots == nil {
return listResp{}, "", errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d", resourceVersion))
}
store, ok := w.snapshots.GetLessOrEqual(resourceVersion)
if !ok {
return listResp{}, "", errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d", resourceVersion))
}
items := store.ListPrefix(key, continueKey)
return listResp{
Items: items,
ResourceVersion: resourceVersion,
}, "", nil
}
func (w *watchCache) listLatestRV(key, continueKey string, matchValues []storage.MatchValue) (resp listResp, index string, err error) {
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
for _, matchValue := range opts.Predicate.MatcherIndex(ctx) {
for _, matchValue := range matchValues {
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
result, err = filterPrefixAndOrder(key, result)
return listResp{
@ -522,7 +573,7 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
}
}
if store, ok := w.store.(orderedLister); ok {
result := store.ListPrefix(key, "")
result := store.ListPrefix(key, continueKey)
return listResp{
Items: result,
ResourceVersion: w.resourceVersion,

View File

@ -254,7 +254,7 @@ func TestList(t *testing.T) {
func TestConsistentList(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestConsistentList(ctx, t, store, increaseRV(client.Client), false, true)
storagetesting.RunTestConsistentList(ctx, t, store, increaseRV(client.Client), false, true, false)
}
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {

View File

@ -1658,7 +1658,7 @@ func ExpectContinueMatches(t *testing.T, expect, got string) {
t.Errorf("expected continue token: %s, got: %s", expectDecoded, gotDecoded)
}
func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, cacheEnabled, consistentReadsSupported bool) {
func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, cacheEnabled, consistentReadsSupported, listFromCacheSnapshot bool) {
outPod := &example.Pod{}
inPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "foo"}}
err := store.Create(ctx, computePodKey(inPod), inPod, outPod, 0)
@ -1701,8 +1701,10 @@ func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Inte
name: "List with negative continue RV returns consistent RV",
continueToken: encodeContinueOrDie("/pods/a", -1),
validateResponseRV: func(t *testing.T, rv int) {
// TODO: Update cacheSyncRV after continue is served from cache.
assert.Equal(t, consistentRV, rv)
if listFromCacheSnapshot {
cacheSyncRV = rv
}
},
},
{
@ -1728,6 +1730,7 @@ func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Inte
t.Run(tc.name, func(t *testing.T) {
out := &example.PodList{}
opts := storage.ListOptions{
Recursive: true,
ResourceVersion: tc.requestRV,
Predicate: storage.SelectionPredicate{
Label: labels.Everything(),