Refactor compaction in etcd3 tests

Kubernetes-commit: 7da7ddd779f9ea835f0c57deae05e050c543066b
This commit is contained in:
Wojciech Tyczyński 2022-10-26 16:34:28 +02:00 committed by Kubernetes Publisher
parent 79726e140b
commit 834cf0fc14
3 changed files with 29 additions and 17 deletions

View File

@ -253,8 +253,26 @@ func TestListContinuationWithFilter(t *testing.T) {
storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation)
}
func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction {
return func(ctx context.Context, t *testing.T, resourceVersion string) {
versioner := storage.APIObjectVersioner{}
rv, err := versioner.ParseResourceVersion(resourceVersion)
if err != nil {
t.Fatal(err)
}
if _, err := etcdClient.KV.Compact(ctx, int64(rv), clientv3.WithCompactPhysical()); err != nil {
t.Fatalf("Unable to compact, %v", err)
}
}
}
func TestListInconsistentContinuation(t *testing.T) {
ctx, store, client := testSetup(t)
compaction := compactStorage(client)
if compaction == nil {
t.Skipf("compaction callback not provided")
}
// Setup storage with the following structure:
// /
@ -342,15 +360,8 @@ func TestListInconsistentContinuation(t *testing.T) {
}
// compact to latest revision.
versioner := storage.APIObjectVersioner{}
lastRVString := preset[2].storedObj.ResourceVersion
lastRV, err := versioner.ParseResourceVersion(lastRVString)
if err != nil {
t.Fatal(err)
}
if _, err := client.KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil {
t.Fatalf("Unable to compact, %v", err)
}
compaction(ctx, t, lastRVString)
// The old continue token should have expired
options = storage.ListOptions{
@ -358,7 +369,7 @@ func TestListInconsistentContinuation(t *testing.T) {
Predicate: pred(0, continueFromSecondItem),
Recursive: true,
}
err = store.GetList(ctx, "/", options, out)
err := store.GetList(ctx, "/", options, out)
if err == nil {
t.Fatalf("unexpected no error")
}

View File

@ -54,6 +54,8 @@ func TestDeleteTriggerWatch(t *testing.T) {
// - watch from 0 is able to return events for objects whose previous version has been compacted
func TestWatchFromZero(t *testing.T) {
ctx, store, client := testSetup(t)
compaction := compactStorage(client)
key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
@ -81,6 +83,10 @@ func TestWatchFromZero(t *testing.T) {
storagetesting.TestCheckResult(t, watch.Added, w, out)
w.Stop()
if compaction == nil {
t.Skip("compaction callback not provided")
}
// Update again
out = &example.Pod{}
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
@ -92,14 +98,7 @@ func TestWatchFromZero(t *testing.T) {
}
// Compact previous versions
revToCompact, err := store.versioner.ParseResourceVersion(out.ResourceVersion)
if err != nil {
t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err)
}
_, err = client.Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical())
if err != nil {
t.Fatalf("Error compacting: %v", err)
}
compaction(ctx, t, out.ResourceVersion)
// Make sure we can still watch from 0 and receive an ADDED event
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})

View File

@ -1410,6 +1410,8 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store
}
}
type Compaction func(ctx context.Context, t *testing.T, resourceVersion string)
type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer
type InterfaceWithPrefixTransformer interface {