Fix enabling consistent list from watch cache also works for resourceVersion=0

Kubernetes-commit: 0b8e79580eb3a63ca7707626b4894adfb9125586
This commit is contained in:
Marek Siarkowicz 2024-03-04 19:35:34 +01:00 committed by Kubernetes Publisher
parent d5a8607203
commit 67b6245fc3
4 changed files with 109 additions and 8 deletions

View File

@ -773,7 +773,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
// minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj)
}
if listRV == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
if err != nil {
return err

View File

@ -167,20 +167,47 @@ func TestPreconditionalDeleteWithSuggestionPass(t *testing.T) {
}
func TestList(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)()
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
}
func TestListWithListFromCache(t *testing.T) {
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove skip.
t.Skip("This test flakes flakes due to https://github.com/etcd-io/etcd/issues/17507")
func TestListWithConsistentListFromCache(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
// Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove the wait when etcd is upgraded to version with fix.
err := cacher.ready.wait(ctx)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
}
func TestConsistentList(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)()
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, false)
}
func TestConsistentListWithConsistentListFromCache(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
// Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove the wait when etcd is upgraded to version with fix.
err := cacher.ready.wait(ctx)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, true)
}
func TestGetListNonRecursive(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)()
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
@ -227,7 +254,7 @@ func TestListInconsistentContinuation(t *testing.T) {
storagetesting.RunTestListInconsistentContinuation(ctx, t, cacher, nil)
}
func TestConsistentList(t *testing.T) {
func TestListResourceVersionMatch(t *testing.T) {
// TODO(#109831): Enable use of this test and run it.
}

View File

@ -216,6 +216,11 @@ func TestList(t *testing.T) {
storagetesting.RunTestList(ctx, t, store, compactStorage(client), false)
}
func TestConsistentList(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client), false, true)
}
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {
return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) {
if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects {
@ -285,9 +290,9 @@ func TestListInconsistentContinuation(t *testing.T) {
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client))
}
func TestConsistentList(t *testing.T) {
func TestListResourceVersionMatch(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestConsistentList(ctx, t, &storeWithPrefixTransformer{store})
storagetesting.RunTestListResourceVersionMatch(ctx, t, &storeWithPrefixTransformer{store})
}
func TestCount(t *testing.T) {

View File

@ -1170,6 +1170,14 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com
expectRV: currentRV,
expectedOut: []example.Pod{},
},
{
name: "test non-consistent List",
prefix: "/pods/empty",
pred: storage.Everything,
rv: "0",
expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion),
expectedOut: []example.Pod{},
},
}
for _, tt := range tests {
@ -1242,6 +1250,67 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com
}
}
func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, cacheEnabled, consistentReadsSupported bool) {
outPod := &example.Pod{}
inPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "foo"}}
err := store.Create(ctx, computePodKey(inPod), inPod, outPod, 0)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
lastObjecRV := outPod.ResourceVersion
compaction(ctx, t, outPod.ResourceVersion)
parsedRV, _ := strconv.Atoi(outPod.ResourceVersion)
currentRV := fmt.Sprintf("%d", parsedRV+1)
firstNonConsistentReadRV := lastObjecRV
if consistentReadsSupported && !cacheEnabled {
firstNonConsistentReadRV = currentRV
}
secondNonConsistentReadRV := lastObjecRV
if consistentReadsSupported {
secondNonConsistentReadRV = currentRV
}
tcs := []struct {
name string
requestRV string
expectResponseRV string
}{
{
name: "Non-consistent list before sync",
requestRV: "0",
expectResponseRV: firstNonConsistentReadRV,
},
{
name: "Consistent request returns currentRV",
requestRV: "",
expectResponseRV: currentRV,
},
{
name: "Non-consistent request after sync returns currentRV",
requestRV: "0",
expectResponseRV: secondNonConsistentReadRV,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
out := &example.PodList{}
opts := storage.ListOptions{
ResourceVersion: tc.requestRV,
Predicate: storage.Everything,
}
err = store.GetList(ctx, "/pods/empty", opts, out)
if err != nil {
t.Fatalf("GetList failed: %v", err)
}
if out.ResourceVersion != tc.expectResponseRV {
t.Errorf("resourceVersion in list response want=%s, got=%s", tc.expectResponseRV, out.ResourceVersion)
}
})
}
}
// seedMultiLevelData creates a set of keys with a multi-level structure, returning a resourceVersion
// from before any were created along with the full set of objects that were persisted
func seedMultiLevelData(ctx context.Context, store storage.Interface) (string, []*example.Pod, error) {
@ -1956,7 +2025,7 @@ type InterfaceWithPrefixTransformer interface {
UpdatePrefixTransformer(PrefixTransformerModifier) func()
}
func RunTestConsistentList(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
func RunTestListResourceVersionMatch(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
nextPod := func(index uint32) (string, *example.Pod) {
obj := &example.Pod{
ObjectMeta: metav1.ObjectMeta{