From 072d278e39a44b0cb15a7becf9d612d6094a60f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Mon, 17 Apr 2023 21:48:08 +0200 Subject: [PATCH] Generalize few watch tests from cacher Kubernetes-commit: 45e836a968acc113cb03768cd8c730bea89bd332 --- pkg/storage/etcd3/watcher_test.go | 10 ++ pkg/storage/testing/watcher_tests.go | 173 +++++++++++++++++++++++++-- pkg/storage/tests/cacher_test.go | 161 ++----------------------- 3 files changed, 182 insertions(+), 162 deletions(-) diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 628c9124d..25507da41 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -61,6 +61,11 @@ func TestWatchFromNoneZero(t *testing.T) { storagetesting.RunTestWatchFromNoneZero(ctx, t, store) } +func TestDelayedWatchDelivery(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunTestDelayedWatchDelivery(ctx, t, store) +} + func TestWatchError(t *testing.T) { ctx, store, _ := testSetup(t) storagetesting.RunTestWatchError(ctx, t, &storeWithPrefixTransformer{store}) @@ -71,6 +76,11 @@ func TestWatchContextCancel(t *testing.T) { storagetesting.RunTestWatchContextCancel(ctx, t, store) } +func TestWatcherTimeout(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunTestWatcherTimeout(ctx, t, store) +} + func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { ctx, store, _ := testSetup(t) storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store) diff --git a/pkg/storage/testing/watcher_tests.go b/pkg/storage/testing/watcher_tests.go index 2c62584e5..08df9f054 100644 --- a/pkg/storage/testing/watcher_tests.go +++ b/pkg/storage/testing/watcher_tests.go @@ -54,6 +54,12 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur Spec: example.PodSpec{NodeName: "bar"}, } + selectedPod := func(pod *example.Pod) *example.Pod { + result := pod.DeepCopy() + result.Labels = map[string]string{"select": "true"} + return result + } + tests := []struct { name string namespace string @@ -94,6 +100,24 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur return nil, fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil }, }, + }, { + name: "filtering", + namespace: fmt.Sprintf("test-ns-5-%t", recursive), + watchTests: []*testWatchStruct{ + {selectedPod(basePod), true, watch.Added}, + {basePod, true, watch.Deleted}, + {selectedPod(basePod), true, watch.Added}, + {selectedPod(basePodAssigned), true, watch.Modified}, + {nil, true, watch.Deleted}, + }, + pred: storage.SelectionPredicate{ + Label: labels.SelectorFromSet(labels.Set{"select": "true"}), + Field: fields.Everything(), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return labels.Set(pod.Labels), nil, nil + }, + }, }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -113,17 +137,39 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur if err != nil { t.Fatalf("Watch failed: %v", err) } + + // Create a pod in a different namespace first to ensure + // that its corresponding event will not be propagated. + badKey := fmt.Sprintf("/pods/%s-bad/foo", tt.namespace) + badOut := &example.Pod{} + err = store.GuaranteedUpdate(ctx, badKey, badOut, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + obj := basePod.DeepCopy() + obj.Namespace = fmt.Sprintf("%s-bad", tt.namespace) + return obj, nil + }), nil) + if err != nil { + t.Fatalf("GuaranteedUpdate of bad pod failed: %v", err) + } + var prevObj *example.Pod for _, watchTest := range tt.watchTests { out := &example.Pod{} - err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( - func(runtime.Object) (runtime.Object, error) { - obj := watchTest.obj.DeepCopy() - obj.Namespace = tt.namespace - return obj, nil - }), nil) - if err != nil { - t.Fatalf("GuaranteedUpdate failed: %v", err) + if watchTest.obj != nil { + err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + obj := watchTest.obj.DeepCopy() + obj.Namespace = tt.namespace + return obj, nil + }), nil) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + } else { + err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, nil) + if err != nil { + t.Fatalf("Delete failed: %v", err) + } } if watchTest.expectEvent { expectObj := out @@ -226,6 +272,60 @@ func RunTestWatchFromNoneZero(ctx context.Context, t *testing.T, store storage.I testCheckResult(t, watch.Modified, w, out) } +func RunTestDelayedWatchDelivery(ctx context.Context, t *testing.T, store storage.Interface) { + _, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) + startRV := storedObj.ResourceVersion + + watcher, err := store.Watch(ctx, "/pods/test-ns", storage.ListOptions{ResourceVersion: startRV, Predicate: storage.Everything, Recursive: true}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Depending on the implementation, different number of events that + // should be delivered to the watcher can be created before it will + // block the implementation and as a result force the watcher to be + // closed (as otherwise events would have to be dropped). + // For now, this number is smallest for Cacher and it equals 21 for it. + totalPods := 21 + for i := 0; i < totalPods; i++ { + out := &example.Pod{} + pod := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i), Namespace: "test-ns"}, + } + err := store.GuaranteedUpdate(ctx, computePodKey(pod), out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return pod, nil + }), nil) + if err != nil { + t.Errorf("GuaranteedUpdate failed: %v", err) + } + } + + // Now stop the watcher and check if the consecutive events are being delivered. + watcher.Stop() + + watched := 0 + for { + event, ok := <-watcher.ResultChan() + if !ok { + break + } + object := event.Object + if co, ok := object.(runtime.CacheableObject); ok { + object = co.GetObject() + } + if a, e := object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a { + t.Errorf("Unexpected object watched: %s, expected %s", a, e) + } + watched++ + } + // We expect at least N events to be delivered, depending on the implementation. + // For now, this number is smallest for Cacher and it equals 10 (size of the out buffer). + if watched < 10 { + t.Errorf("Unexpected number of events: %v, expected: %v", watched, totalPods) + } +} + func RunTestWatchError(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) { obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}} key := computePodKey(obj) @@ -285,6 +385,63 @@ func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage. } } +func RunTestWatcherTimeout(ctx context.Context, t *testing.T, store storage.Interface) { + // initialRV is used to initate the watcher at the beginning of the world. + podList := example.PodList{} + options := storage.ListOptions{ + Predicate: storage.Everything, + Recursive: true, + } + if err := store.GetList(ctx, "/pods", options, &podList); err != nil { + t.Fatalf("Failed to list pods: %v", err) + } + initialRV := podList.ResourceVersion + + options = storage.ListOptions{ + ResourceVersion: initialRV, + Predicate: storage.Everything, + Recursive: true, + } + + // Create a number of watchers that will not be reading any result. + nonReadingWatchers := 50 + for i := 0; i < nonReadingWatchers; i++ { + watcher, err := store.Watch(ctx, "/pods/test-ns", options) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watcher.Stop() + } + + // Create a second watcher that will be reading result. + readingWatcher, err := store.Watch(ctx, "/pods/test-ns", options) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer readingWatcher.Stop() + + // Depending on the implementation, different number of events that + // should be delivered to the watcher can be created before it will + // block the implementation and as a result force the watcher to be + // closed (as otherwise events would have to be dropped). + // For now, this number is smallest for Cacher and it equals 21 for it. + // + // Create more events to ensure that we're not blocking other watchers + // forever. + startTime := time.Now() + for i := 0; i < 22; i++ { + out := &example.Pod{} + pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i), Namespace: "test-ns"}} + if err := store.Create(ctx, computePodKey(pod), pod, out, 0); err != nil { + t.Fatalf("Create failed: %v", err) + } + testCheckResult(t, watch.Added, readingWatcher, out) + } + if time.Since(startTime) > time.Duration(250*nonReadingWatchers)*time.Millisecond { + t.Errorf("waiting for events took too long: %v", time.Since(startTime)) + } +} + func RunTestWatchDeleteEventObjectHaveLatestRV(ctx context.Context, t *testing.T, store storage.Interface) { key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) diff --git a/pkg/storage/tests/cacher_test.go b/pkg/storage/tests/cacher_test.go index 7c5014a36..531aacdb9 100644 --- a/pkg/storage/tests/cacher_test.go +++ b/pkg/storage/tests/cacher_test.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/api/apitesting" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -310,113 +309,9 @@ func TestWatchDeprecated(t *testing.T) { } func TestWatcherTimeout(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) - defer server.Terminate(t) - cacher, _, err := newTestCacher(etcdStorage) - if err != nil { - t.Fatalf("Couldn't create cacher: %v", err) - } - defer cacher.Stop() - - // initialVersion is used to initate the watcher at the beginning of the world, - // which is not defined precisely in etcd. - initialVersion, err := cacher.LastSyncResourceVersion() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - startVersion := strconv.Itoa(int(initialVersion)) - - // Create a number of watchers that will not be reading any result. - nonReadingWatchers := 50 - for i := 0; i < nonReadingWatchers; i++ { - watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything, Recursive: true}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer watcher.Stop() - } - - // Create a second watcher that will be reading result. - readingWatcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything, Recursive: true}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer readingWatcher.Stop() - - startTime := time.Now() - for i := 1; i <= 22; i++ { - pod := makeTestPod(strconv.Itoa(i)) - _ = updatePod(t, etcdStorage, pod, nil) - verifyWatchEvent(t, readingWatcher, watch.Added, pod) - } - if time.Since(startTime) > time.Duration(250*nonReadingWatchers)*time.Millisecond { - t.Errorf("waiting for events took too long: %v", time.Since(startTime)) - } -} - -func TestFiltering(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) - defer server.Terminate(t) - cacher, _, err := newTestCacher(etcdStorage) - if err != nil { - t.Fatalf("Couldn't create cacher: %v", err) - } - defer cacher.Stop() - - // Ensure that the cacher is initialized, before creating any pods, - // so that we are sure that all events will be present in cacher. - syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - syncWatcher.Stop() - - podFoo := makeTestPod("foo") - podFoo.Labels = map[string]string{"filter": "foo"} - podFooFiltered := makeTestPod("foo") - podFooPrime := makeTestPod("foo") - podFooPrime.Labels = map[string]string{"filter": "foo"} - podFooPrime.Spec.NodeName = "fakeNode" - - podFooNS2 := makeTestPod("foo") - podFooNS2.Namespace += "2" - podFooNS2.Labels = map[string]string{"filter": "foo"} - - // Create in another namespace first to make sure events from other namespaces don't get delivered - updatePod(t, etcdStorage, podFooNS2, nil) - - fooCreated := updatePod(t, etcdStorage, podFoo, nil) - fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated) - fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered) - _ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered) - - deleted := example.Pod{} - if err := etcdStorage.Delete(context.TODO(), "pods/ns/foo", &deleted, nil, storage.ValidateAllObjectFunc, nil); err != nil { - t.Errorf("Unexpected error: %v", err) - } - - // Set up Watch for object "podFoo" with label filter set. - pred := storage.SelectionPredicate{ - Label: labels.SelectorFromSet(labels.Set{"filter": "foo"}), - Field: fields.Everything(), - GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) { - metadata, err := meta.Accessor(obj) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - return labels.Set(metadata.GetLabels()), nil, nil - }, - } - watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: pred}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer watcher.Stop() - - verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered) - verifyWatchEvent(t, watcher, watch.Added, podFoo) - verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) - verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime) + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestWatcherTimeout(ctx, t, cacher) } func TestEmptyWatchEventCache(t *testing.T) { @@ -486,52 +381,10 @@ func TestEmptyWatchEventCache(t *testing.T) { } } -func TestRandomWatchDeliver(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) - defer server.Terminate(t) - cacher, v, err := newTestCacher(etcdStorage) - if err != nil { - t.Fatalf("Couldn't create cacher: %v", err) - } - defer cacher.Stop() - - fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) - rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - startVersion := strconv.Itoa(int(rv)) - - watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything, Recursive: true}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Now we can create exactly 21 events that should be delivered - // to the watcher, before it will completely block cacher and as - // a result will be dropped. - for i := 0; i < 21; i++ { - updatePod(t, etcdStorage, makeTestPod(fmt.Sprintf("foo-%d", i)), nil) - } - - // Now stop the watcher and check if the consecutive events are being delivered. - watcher.Stop() - - watched := 0 - for { - event, ok := <-watcher.ResultChan() - if !ok { - break - } - object := event.Object - if co, ok := object.(runtime.CacheableObject); ok { - object = co.GetObject() - } - if a, e := object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a { - t.Errorf("Unexpected object watched: %s, expected %s", a, e) - } - watched++ - } +func TestDelayedWatchDelivery(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestDelayedWatchDelivery(ctx, t, cacher) } func TestCacherListerWatcher(t *testing.T) {