Test consistent List

Kubernetes-commit: b36fdd68b72cd5c64ea5be3917846067644da983
This commit is contained in:
Marek Siarkowicz 2023-07-03 17:22:22 +02:00 committed by Kubernetes Publisher
parent ce72ad1c02
commit df9896fd00
4 changed files with 37 additions and 15 deletions

View File

@ -145,9 +145,9 @@ func TestPreconditionalDeleteWithSuggestion(t *testing.T) {
}
func TestList(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, true)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
}
func TestListWithoutPaging(t *testing.T) {
@ -238,9 +238,9 @@ func TestWatch(t *testing.T) {
}
func TestWatchFromZero(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher))
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client))
}
func TestDeleteTriggerWatch(t *testing.T) {
@ -365,6 +365,11 @@ func withoutPaging(options *setupOptions) {
}
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) {
ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
return ctx, cacher, tearDown
}
func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context, *Cacher, *etcd3testing.EtcdTestServer, tearDownFunc) {
setupOpts := setupOptions{}
opts = append([]setupOption{withDefaults}, opts...)
for _, opt := range opts {
@ -407,5 +412,5 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tea
t.Fatalf("Failed to inject list errors: %v", err)
}
return ctx, cacher, terminate
return ctx, cacher, server, terminate
}

View File

@ -21,6 +21,8 @@ import (
"fmt"
"testing"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -71,7 +73,7 @@ func computePodKey(obj *example.Pod) string {
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
}
func compactStorage(c *Cacher) storagetesting.Compaction {
func compactStorage(c *Cacher, client *clientv3.Client) storagetesting.Compaction {
return func(ctx context.Context, t *testing.T, resourceVersion string) {
versioner := storage.APIObjectVersioner{}
rv, err := versioner.ParseResourceVersion(resourceVersion)
@ -93,9 +95,6 @@ func compactStorage(c *Cacher) storagetesting.Compaction {
if c.watchCache.resourceVersion < rv {
t.Fatalf("Can't compact into a future version: %v", resourceVersion)
}
if rv < c.watchCache.listResourceVersion {
t.Fatalf("Can't compact into a past version: %v", resourceVersion)
}
if len(c.watchers.allWatchers) > 0 || len(c.watchers.valueWatchers) > 0 {
// We could consider terminating those watchers, but given
@ -114,6 +113,11 @@ func compactStorage(c *Cacher) storagetesting.Compaction {
}
c.watchCache.listResourceVersion = rv
// TODO(wojtek-t): We should also compact the underlying etcd storage.
if _, err = client.KV.Put(ctx, "compact_rev_key", resourceVersion); err != nil {
t.Fatalf("Could not update compact_rev_key: %v", err)
}
if _, err = client.Compact(ctx, int64(rv)); err != nil {
t.Fatalf("Could not compact: %v", err)
}
}
}

View File

@ -191,8 +191,8 @@ func TestTransformationFailure(t *testing.T) {
}
func TestList(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestList(ctx, t, store, false)
ctx, store, client := testSetup(t)
storagetesting.RunTestList(ctx, t, store, compactStorage(client), false)
}
func TestListWithoutPaging(t *testing.T) {
@ -258,7 +258,7 @@ func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction {
if err != nil {
t.Fatal(err)
}
if _, err := etcdClient.KV.Compact(ctx, int64(rv), clientv3.WithCompactPhysical()); err != nil {
if _, _, err = compact(ctx, etcdClient, 0, int64(rv)); err != nil {
t.Fatalf("Unable to compact, %v", err)
}
}

View File

@ -478,7 +478,7 @@ func RunTestPreconditionalDeleteWithSuggestion(ctx context.Context, t *testing.T
}
}
func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ignoreWatchCacheTests bool) {
func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, ignoreWatchCacheTests bool) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)()
initialRV, preset, err := seedMultiLevelData(ctx, store)
@ -506,6 +506,11 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ign
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name, "spec.nodeName": pod.Spec.NodeName}, nil
}
// Use compact to increase etcd global revision without changes to any resources.
// The increase in resources version comes from Kubernetes compaction updating hidden key.
// Used to test consistent List to confirm it returns latest etcd revision.
compaction(ctx, t, initialRV)
currentRV := fmt.Sprintf("%d", continueRV+1)
tests := []struct {
name string
@ -706,7 +711,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ign
expectedRemainingItemCount: utilpointer.Int64(1),
rv: list.ResourceVersion,
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
expectRV: list.ResourceVersion,
expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion),
},
{
name: "test List with limit at resource version 0",
@ -1019,6 +1024,14 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ign
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
expectedOut: []example.Pod{},
},
{
name: "test consistent List",
prefix: "/pods/empty",
pred: storage.Everything,
rv: "",
expectRV: currentRV,
expectedOut: []example.Pod{},
},
}
for _, tt := range tests {