diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index ef7f58fec..5d18d8313 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -1328,15 +1328,52 @@ func newErrWatcher(err error) *errWatcher { } func (c *Cacher) ShouldDelegateExactRV(resourceVersion string, recursive bool) (delegator.Result, error) { - return delegator.CacheWithoutSnapshots{}.ShouldDelegateExactRV(resourceVersion, recursive) + // Not Recursive is not supported unitl exact RV is implemented for WaitUntilFreshAndGet. + if !recursive || c.watchCache.snapshots == nil { + return delegator.Result{ShouldDelegate: true}, nil + } + listRV, err := c.versioner.ParseResourceVersion(resourceVersion) + if err != nil { + return delegator.Result{}, err + } + return c.shouldDelegateExactRV(listRV) } func (c *Cacher) ShouldDelegateContinue(continueToken string, recursive bool) (delegator.Result, error) { - return delegator.CacheWithoutSnapshots{}.ShouldDelegateContinue(continueToken, recursive) + // Not Recursive is not supported unitl exact RV is implemented for WaitUntilFreshAndGet. + if !recursive || c.watchCache.snapshots == nil { + return delegator.Result{ShouldDelegate: true}, nil + } + _, continueRV, err := storage.DecodeContinue(continueToken, c.resourcePrefix) + if err != nil { + return delegator.Result{}, err + } + if continueRV > 0 { + return c.shouldDelegateExactRV(uint64(continueRV)) + } else { + // Continue with negative RV is a consistent read. + return c.ShouldDelegateConsistentRead() + } +} + +func (c *Cacher) shouldDelegateExactRV(rv uint64) (delegator.Result, error) { + // Exact requests on future revision require support for consistent read, but are not a consistent read by themselves. + if c.watchCache.notFresh(rv) { + return delegator.Result{ + ShouldDelegate: !delegator.ConsistentReadSupported(), + }, nil + } + _, canServe := c.watchCache.snapshots.GetLessOrEqual(rv) + return delegator.Result{ + ShouldDelegate: !canServe, + }, nil } func (c *Cacher) ShouldDelegateConsistentRead() (delegator.Result, error) { - return delegator.CacheWithoutSnapshots{}.ShouldDelegateConsistentRead() + return delegator.Result{ + ConsistentRead: true, + ShouldDelegate: !delegator.ConsistentReadSupported(), + }, nil } // Implements watch.Interface. diff --git a/pkg/storage/cacher/cacher_test.go b/pkg/storage/cacher/cacher_test.go index 9a6538446..fe8ca1008 100644 --- a/pkg/storage/cacher/cacher_test.go +++ b/pkg/storage/cacher/cacher_test.go @@ -179,20 +179,30 @@ func TestListPaging(t *testing.T) { func TestList(t *testing.T) { for _, consistentRead := range []bool{true, false} { t.Run(fmt.Sprintf("ConsistentListFromCache=%v", consistentRead), func(t *testing.T) { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead) - ctx, cacher, server, terminate := testSetupWithEtcdServer(t) - t.Cleanup(terminate) - storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true) + for _, listFromCacheSnapshot := range []bool{true, false} { + t.Run(fmt.Sprintf("ListFromCacheSnapsthot=%v", listFromCacheSnapshot), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, listFromCacheSnapshot) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead) + ctx, cacher, server, terminate := testSetupWithEtcdServer(t) + t.Cleanup(terminate) + storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true) + }) + } }) } } func TestConsistentList(t *testing.T) { for _, consistentRead := range []bool{true, false} { t.Run(fmt.Sprintf("ConsistentListFromCache=%v", consistentRead), func(t *testing.T) { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead) - ctx, cacher, server, terminate := testSetupWithEtcdServer(t) - t.Cleanup(terminate) - storagetesting.RunTestConsistentList(ctx, t, cacher, increaseRV(server.V3Client.Client), true, consistentRead) + for _, listFromCacheSnapshot := range []bool{true, false} { + t.Run(fmt.Sprintf("ListFromCacheSnapsthot=%v", listFromCacheSnapshot), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, listFromCacheSnapshot) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead) + ctx, cacher, server, terminate := testSetupWithEtcdServer(t) + t.Cleanup(terminate) + storagetesting.RunTestConsistentList(ctx, t, cacher, increaseRV(server.V3Client.Client), true, consistentRead, listFromCacheSnapshot) + }) + } }) } } @@ -200,10 +210,15 @@ func TestConsistentList(t *testing.T) { func TestGetListNonRecursive(t *testing.T) { for _, consistentRead := range []bool{true, false} { t.Run(fmt.Sprintf("ConsistentListFromCache=%v", consistentRead), func(t *testing.T) { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead) - ctx, cacher, server, terminate := testSetupWithEtcdServer(t) - t.Cleanup(terminate) - storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(server.V3Client.Client), cacher) + for _, listFromCacheSnapshot := range []bool{true, false} { + t.Run(fmt.Sprintf("ListFromCacheSnapsthot=%v", listFromCacheSnapshot), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, listFromCacheSnapshot) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead) + ctx, cacher, server, terminate := testSetupWithEtcdServer(t) + t.Cleanup(terminate) + storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(server.V3Client.Client), cacher) + }) + } }) } } diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index ca53fde6a..6d6d5f8d1 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -344,7 +344,7 @@ func TestShouldDelegateList(t *testing.T) { } } - runTestCases := func(t *testing.T, testcases map[opts]bool, overrides ...map[opts]bool) { + runTestCases := func(t *testing.T, snapshotAvailable bool, testcases map[opts]bool, overrides ...map[opts]bool) { for opt, expectBypass := range testCases { for _, override := range overrides { if bypass, ok := override[opt]; ok { @@ -362,6 +362,9 @@ func TestShouldDelegateList(t *testing.T) { t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() + if snapshotAvailable { + cacher.watchCache.snapshots.Add(uint64(mustAtoi(oldRV)), fakeOrderedLister{}) + } result, err := shouldDelegateList(toStorageOpts(opt), cacher) if err != nil { t.Fatal(err) @@ -371,19 +374,46 @@ func TestShouldDelegateList(t *testing.T) { } } } - consistentListFromCacheOverrides := map[opts]bool{} - for _, recursive := range []bool{true, false} { - consistentListFromCacheOverrides[opts{Recursive: recursive}] = false - consistentListFromCacheOverrides[opts{Limit: 100, Recursive: recursive}] = false - } + + // Exacts and continue on current cache RV. + listFromSnapshotEnabledOverrides := map[opts]bool{} + listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: cacheRV, Limit: 100}] = false + listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: cacheRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = false + listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: cacheRV, Limit: 100, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = false + listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: "0", Limit: 100, Continue: continueOnCacheRV}] = false + listFromSnapshotEnabledOverrides[opts{Recursive: true, ResourceVersion: "0", Continue: continueOnCacheRV}] = false + listFromSnapshotEnabledOverrides[opts{Recursive: true, Limit: 100, Continue: continueOnCacheRV}] = false + listFromSnapshotEnabledOverrides[opts{Recursive: true, Continue: continueOnCacheRV}] = false + + // Exacts and continue RV with a snapshot. + snapshotAvailableOverrides := map[opts]bool{} + snapshotAvailableOverrides[opts{Recursive: true, Continue: continueOnOldRV}] = false + snapshotAvailableOverrides[opts{Recursive: true, Limit: 100, Continue: continueOnOldRV}] = false + snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: "0", Continue: continueOnOldRV}] = false + snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: "0", Limit: 100, Continue: continueOnOldRV}] = false + snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: oldRV, Limit: 100}] = false + snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: oldRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = false + snapshotAvailableOverrides[opts{Recursive: true, ResourceVersion: oldRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact, Limit: 100}] = false t.Run("ConsistentListFromCache=false", func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false) - runTestCases(t, testCases) + t.Run("ListFromCacheSnapshot=false", func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, false) + runTestCases(t, false, testCases) + }) + + t.Run("ListFromCacheSnapshot=true", func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true) + t.Run("SnapshotAvailable=false", func(t *testing.T) { + runTestCases(t, false, testCases, listFromSnapshotEnabledOverrides) + }) + t.Run("SnapshotAvailable=true", func(t *testing.T) { + runTestCases(t, true, testCases, listFromSnapshotEnabledOverrides, snapshotAvailableOverrides) + }) + }) }) t.Run("ConsistentListFromCache=true", func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true) - // TODO(p0lyn0mial): the following tests assume that etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) // evaluates to true. Otherwise the cache will be bypassed and the test will fail. // @@ -391,7 +421,41 @@ func TestShouldDelegateList(t *testing.T) { // However in CI all test are run and there must be a test(s) that properly // initialize the storage layer so that the mentioned method evaluates to true forceRequestWatchProgressSupport(t) - runTestCases(t, testCases, consistentListFromCacheOverrides) + + consistentListFromCacheOverrides := map[opts]bool{} + for _, recursive := range []bool{true, false} { + consistentListFromCacheOverrides[opts{Recursive: recursive}] = false + consistentListFromCacheOverrides[opts{Limit: 100, Recursive: recursive}] = false + } + + t.Run("ListFromCacheSnapshot=false", func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, false) + runTestCases(t, false, testCases, consistentListFromCacheOverrides) + }) + t.Run("ListFromCacheSnapshot=true", func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true) + + consistentReadWithSnapshotOverrides := map[opts]bool{} + // Continues with negative RV are same as consistent read. + consistentReadWithSnapshotOverrides[opts{Recursive: true, Limit: 0, Continue: continueOnNegativeRV}] = false + consistentReadWithSnapshotOverrides[opts{Recursive: true, Limit: 100, Continue: continueOnNegativeRV}] = false + consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: "0", Limit: 0, Continue: continueOnNegativeRV}] = false + consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: "0", Limit: 100, Continue: continueOnNegativeRV}] = false + // Exact on RV not yet observed by cache + consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: etcdRV, Limit: 100}] = false + consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: etcdRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = false + consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: etcdRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact, Limit: 100}] = false + consistentReadWithSnapshotOverrides[opts{Recursive: true, Continue: continueOnEtcdRV}] = false + consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: "0", Continue: continueOnEtcdRV}] = false + consistentReadWithSnapshotOverrides[opts{Recursive: true, Continue: continueOnEtcdRV, Limit: 100}] = false + consistentReadWithSnapshotOverrides[opts{Recursive: true, ResourceVersion: "0", Continue: continueOnEtcdRV, Limit: 100}] = false + t.Run("SnapshotAvailable=false", func(t *testing.T) { + runTestCases(t, false, testCases, listFromSnapshotEnabledOverrides, consistentListFromCacheOverrides, consistentReadWithSnapshotOverrides) + }) + t.Run("SnapshotAvailable=true", func(t *testing.T) { + runTestCases(t, true, testCases, listFromSnapshotEnabledOverrides, consistentListFromCacheOverrides, consistentReadWithSnapshotOverrides, snapshotAvailableOverrides) + }) + }) }) } @@ -568,6 +632,91 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su } } +func TestMatchExactResourceVersionFallback(t *testing.T) { + tcs := []struct { + name string + snapshotsAvailable []bool + + expectStoreRequests int + expectSnapshotRequests int + }{ + { + name: "Disabled", + snapshotsAvailable: []bool{false, false}, + expectStoreRequests: 2, + expectSnapshotRequests: 1, + }, + { + name: "Enabled", + snapshotsAvailable: []bool{true, true}, + expectStoreRequests: 1, + expectSnapshotRequests: 2, + }, + { + name: "Fallback", + snapshotsAvailable: []bool{true, false}, + expectSnapshotRequests: 2, + expectStoreRequests: 2, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + backingStorage := &dummyStorage{} + expectStoreRequests := 0 + backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + expectStoreRequests++ + podList := listObj.(*example.PodList) + switch opts.ResourceVersionMatch { + case "": + podList.ResourceVersion = "42" + case metav1.ResourceVersionMatchExact: + podList.ResourceVersion = opts.ResourceVersion + } + return nil + } + cacher, _, err := newTestCacherWithoutSyncing(backingStorage, clock.RealClock{}) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + snapshotRequestCount := 0 + cacher.watchCache.RWMutex.Lock() + cacher.watchCache.snapshots = &fakeSnapshotter{ + getLessOrEqual: func(rv uint64) (orderedLister, bool) { + snapshotAvailable := tc.snapshotsAvailable[snapshotRequestCount] + snapshotRequestCount++ + if snapshotAvailable { + return fakeOrderedLister{}, true + } else { + return nil, false + } + }, + } + cacher.watchCache.RWMutex.Unlock() + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + delegator := NewCacheDelegator(cacher, backingStorage) + + result := &example.PodList{} + err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "20", ResourceVersionMatch: metav1.ResourceVersionMatchExact, Recursive: true}, result) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if result.ResourceVersion != "20" { + t.Fatalf("Unexpected List response RV, got: %q, want: %d", result.ResourceVersion, 20) + } + if expectStoreRequests != tc.expectStoreRequests { + t.Fatalf("Unexpected number of requests to storage, got: %d, want: %d", expectStoreRequests, tc.expectStoreRequests) + } + if snapshotRequestCount != tc.expectSnapshotRequests { + t.Fatalf("Unexpected number of requests to snapshots, got: %d, want: %d", snapshotRequestCount, tc.expectSnapshotRequests) + } + + }) + } +} + func TestGetListNonRecursiveCacheBypass(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false) backingStorage := &dummyStorage{} diff --git a/pkg/storage/cacher/delegator.go b/pkg/storage/cacher/delegator.go index 0b968eef5..501190dcc 100644 --- a/pkg/storage/cacher/delegator.go +++ b/pkg/storage/cacher/delegator.go @@ -218,6 +218,9 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L success := "true" fallback := "false" if err != nil { + if errors.IsResourceExpired(err) { + return c.storage.GetList(ctx, key, opts, listObj) + } if result.ConsistentRead { if storage.IsTooLargeResourceVersion(err) { fallback = "true" diff --git a/pkg/storage/cacher/watch_cache.go b/pkg/storage/cacher/watch_cache.go index 908081efb..967a60c9b 100644 --- a/pkg/storage/cacher/watch_cache.go +++ b/pkg/storage/cacher/watch_cache.go @@ -25,6 +25,7 @@ import ( "time" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -508,11 +509,61 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion if err != nil { return listResp{}, "", err } + return w.list(ctx, resourceVersion, key, opts) +} + +// NOTICE: Structure follows the shouldDelegateList function in +// staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go +func (w *watchCache) list(ctx context.Context, resourceVersion uint64, key string, opts storage.ListOptions) (resp listResp, index string, err error) { + switch opts.ResourceVersionMatch { + case metav1.ResourceVersionMatchExact: + return w.listExactRV(key, "", resourceVersion) + case metav1.ResourceVersionMatchNotOlderThan: + case "": + // Continue + if len(opts.Predicate.Continue) > 0 { + continueKey, continueRV, err := storage.DecodeContinue(opts.Predicate.Continue, key) + if err != nil { + return listResp{}, "", errors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) + } + if continueRV > 0 { + return w.listExactRV(key, continueKey, uint64(continueRV)) + } else { + // Continue with negative RV is a consistent read - already handled via waitUntilFreshAndBlock. + // Don't pass matchValues as they don't support continueKey + return w.listLatestRV(key, continueKey, nil) + } + } + // Legacy exact match + if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" { + return w.listExactRV(key, "", resourceVersion) + } + // Consistent Read - already handled via waitUntilFreshAndBlock + } + return w.listLatestRV(key, "", opts.Predicate.MatcherIndex(ctx)) +} + +func (w *watchCache) listExactRV(key, continueKey string, resourceVersion uint64) (resp listResp, index string, err error) { + if w.snapshots == nil { + return listResp{}, "", errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d", resourceVersion)) + } + store, ok := w.snapshots.GetLessOrEqual(resourceVersion) + if !ok { + return listResp{}, "", errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d", resourceVersion)) + } + items := store.ListPrefix(key, continueKey) + return listResp{ + Items: items, + ResourceVersion: resourceVersion, + }, "", nil +} + +func (w *watchCache) listLatestRV(key, continueKey string, matchValues []storage.MatchValue) (resp listResp, index string, err error) { // This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only // requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we // want - they will be filtered out later. The fact that we return less things is only further performance improvement. // TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible. - for _, matchValue := range opts.Predicate.MatcherIndex(ctx) { + for _, matchValue := range matchValues { if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { result, err = filterPrefixAndOrder(key, result) return listResp{ @@ -522,7 +573,7 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion } } if store, ok := w.store.(orderedLister); ok { - result := store.ListPrefix(key, "") + result := store.ListPrefix(key, continueKey) return listResp{ Items: result, ResourceVersion: w.resourceVersion, diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index ee7482524..4a4ef819c 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -254,7 +254,7 @@ func TestList(t *testing.T) { func TestConsistentList(t *testing.T) { ctx, store, client := testSetup(t) - storagetesting.RunTestConsistentList(ctx, t, store, increaseRV(client.Client), false, true) + storagetesting.RunTestConsistentList(ctx, t, store, increaseRV(client.Client), false, true, false) } func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation { diff --git a/pkg/storage/testing/store_tests.go b/pkg/storage/testing/store_tests.go index 668d0b629..284f6f8e9 100644 --- a/pkg/storage/testing/store_tests.go +++ b/pkg/storage/testing/store_tests.go @@ -1658,7 +1658,7 @@ func ExpectContinueMatches(t *testing.T, expect, got string) { t.Errorf("expected continue token: %s, got: %s", expectDecoded, gotDecoded) } -func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, cacheEnabled, consistentReadsSupported bool) { +func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, cacheEnabled, consistentReadsSupported, listFromCacheSnapshot bool) { outPod := &example.Pod{} inPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "foo"}} err := store.Create(ctx, computePodKey(inPod), inPod, outPod, 0) @@ -1701,8 +1701,10 @@ func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Inte name: "List with negative continue RV returns consistent RV", continueToken: encodeContinueOrDie("/pods/a", -1), validateResponseRV: func(t *testing.T, rv int) { - // TODO: Update cacheSyncRV after continue is served from cache. assert.Equal(t, consistentRV, rv) + if listFromCacheSnapshot { + cacheSyncRV = rv + } }, }, { @@ -1728,6 +1730,7 @@ func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Inte t.Run(tc.name, func(t *testing.T) { out := &example.PodList{} opts := storage.ListOptions{ + Recursive: true, ResourceVersion: tc.requestRV, Predicate: storage.SelectionPredicate{ Label: labels.Everything(),