diff --git a/pkg/storage/cacher/cacher_test.go b/pkg/storage/cacher/cacher_test.go index c4ca88ef6..95e6c9108 100644 --- a/pkg/storage/cacher/cacher_test.go +++ b/pkg/storage/cacher/cacher_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -327,6 +328,18 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) { storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store) } +func TestCacherWatchSemantics(t *testing.T) { + store, terminate := testSetupWithEtcdAndCreateWrapper(t) + t.Cleanup(terminate) + storagetesting.RunWatchSemantics(context.TODO(), t, store) +} + +func TestCacherWatchSemanticInitialEventsExtended(t *testing.T) { + store, terminate := testSetupWithEtcdAndCreateWrapper(t) + t.Cleanup(terminate) + storagetesting.RunWatchSemanticInitialEventsExtended(context.TODO(), t, store) +} + // =================================================== // Test-setup related function are following. // =================================================== @@ -424,3 +437,36 @@ func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context return ctx, cacher, server, terminate } + +func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) { + _, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...) + + if err := cacher.ready.wait(context.TODO()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + return &createWrapper{Cacher: cacher}, tearDown +} + +type createWrapper struct { + *Cacher +} + +func (c *createWrapper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + if err := c.Cacher.Create(ctx, key, obj, out, ttl); err != nil { + return err + } + return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { + currentObj := c.Cacher.newFunc() + err := c.Cacher.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj) + if err != nil { + if storage.IsNotFound(err) { + return false, nil + } + return false, err + } + if !apiequality.Semantic.DeepEqual(currentObj, out) { + return false, nil + } + return true, nil + }) +} diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index eecf2ddfc..fcef65efb 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -1340,15 +1340,6 @@ func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event, strictO } } -func verifyNoEvents(t *testing.T, w watch.Interface) { - select { - case e := <-w.ResultChan(): - t.Errorf("Unexpected: %#v event received, expected no events", e) - case <-time.After(time.Second): - return - } -} - func TestCachingDeleteEvents(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) @@ -1611,205 +1602,6 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) { } } -func TestCacherWatchSemantics(t *testing.T) { - trueVal, falseVal := true, false - makePod := func(rv uint64) *example.Pod { - return &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("pod-%d", rv), - Namespace: "ns", - ResourceVersion: fmt.Sprintf("%d", rv), - Annotations: map[string]string{}, - }, - } - } - - scenarios := []struct { - name string - allowWatchBookmarks bool - sendInitialEvents *bool - resourceVersion string - storageResourceVersion string - - initialPods []*example.Pod - podsAfterEstablishingWatch []*example.Pod - - expectedInitialEventsInStrictOrder []watch.Event - expectedInitialEventsInRandomOrder []watch.Event - expectedEventsAfterEstablishingWatch []watch.Event - }{ - { - name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset, storageRV=102", - allowWatchBookmarks: true, - sendInitialEvents: &trueVal, - storageResourceVersion: "102", - initialPods: []*example.Pod{makePod(101)}, - podsAfterEstablishingWatch: []*example.Pod{makePod(102)}, - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}}, - expectedEventsAfterEstablishingWatch: []watch.Event{ - {Type: watch.Added, Object: makePod(102)}, - {Type: watch.Bookmark, Object: &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "102", - Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, - }, - }}, - }, - }, - { - name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0, storageRV=105", - allowWatchBookmarks: true, - sendInitialEvents: &trueVal, - resourceVersion: "0", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - expectedInitialEventsInRandomOrder: []watch.Event{ - {Type: watch.Added, Object: makePod(101)}, - {Type: watch.Added, Object: makePod(102)}, - }, - expectedInitialEventsInStrictOrder: []watch.Event{ - {Type: watch.Bookmark, Object: &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "102", - Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, - }, - }}, - }, - }, - { - name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=101, storageRV=105", - allowWatchBookmarks: true, - sendInitialEvents: &trueVal, - resourceVersion: "101", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, - expectedInitialEventsInStrictOrder: []watch.Event{ - {Type: watch.Bookmark, Object: &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "102", - Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, - }, - }}, - }, - }, - { - name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset, storageRV=102", - sendInitialEvents: &trueVal, - storageResourceVersion: "102", - initialPods: []*example.Pod{makePod(101)}, - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}}, - podsAfterEstablishingWatch: []*example.Pod{makePod(102)}, - expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(102)}}, - }, - { - // note we set storage's RV to some future value, mustn't be used by this scenario - name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=0, storageRV=105", - sendInitialEvents: &trueVal, - resourceVersion: "0", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, - }, - { - // note we set storage's RV to some future value, mustn't be used by this scenario - name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=101, storageRV=105", - sendInitialEvents: &trueVal, - resourceVersion: "101", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - // make sure we only get initial events that are > initial RV (101) - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, - }, - { - name: "sendInitialEvents=false, RV=unset, storageRV=103", - sendInitialEvents: &falseVal, - storageResourceVersion: "103", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - podsAfterEstablishingWatch: []*example.Pod{makePod(104)}, - expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(104)}}, - }, - { - // note we set storage's RV to some future value, mustn't be used by this scenario - name: "sendInitialEvents=false, RV=0, storageRV=105", - sendInitialEvents: &falseVal, - resourceVersion: "0", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - podsAfterEstablishingWatch: []*example.Pod{makePod(103)}, - expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(103)}}, - }, - { - // note we set storage's RV to some future value, mustn't be used by this scenario - name: "legacy, RV=0, storageRV=105", - resourceVersion: "0", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, - }, - { - // note we set storage's RV to some future value, mustn't be used by this scenario - name: "legacy, RV=unset, storageRV=105", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - // no events because the watch is delegated to the underlying storage - }, - } - for _, scenario := range scenarios { - t.Run(scenario.name, func(t *testing.T) { - // set up env - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() - storageListMetaResourceVersion := "" - backingStorage := &dummyStorage{getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error { - podList := listObj.(*example.PodList) - podList.ListMeta = metav1.ListMeta{ResourceVersion: storageListMetaResourceVersion} - return nil - }} - - cacher, _, err := newTestCacher(backingStorage) - if err != nil { - t.Fatalf("falied to create cacher: %v", err) - } - defer cacher.Stop() - if err := cacher.ready.wait(context.TODO()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") - } - - // now, run a scenario - // but first let's add some initial data - for _, obj := range scenario.initialPods { - err = cacher.watchCache.Add(obj) - require.NoError(t, err, "failed to add a pod: %v") - } - // read request params - opts := storage.ListOptions{Predicate: storage.Everything} - opts.SendInitialEvents = scenario.sendInitialEvents - opts.Predicate.AllowWatchBookmarks = scenario.allowWatchBookmarks - if len(scenario.resourceVersion) > 0 { - opts.ResourceVersion = scenario.resourceVersion - } - // before starting a new watch set a storage RV to some future value - storageListMetaResourceVersion = scenario.storageResourceVersion - - w, err := cacher.Watch(context.Background(), "pods/ns", opts) - require.NoError(t, err, "failed to create watch: %v") - defer w.Stop() - - // make sure we only get initial events - verifyEvents(t, w, scenario.expectedInitialEventsInRandomOrder, false) - verifyEvents(t, w, scenario.expectedInitialEventsInStrictOrder, true) - verifyNoEvents(t, w) - // add a pod that is greater than the storage's RV when the watch was started - for _, obj := range scenario.podsAfterEstablishingWatch { - err = cacher.watchCache.Add(obj) - require.NoError(t, err, "failed to add a pod: %v") - } - verifyEvents(t, w, scenario.expectedEventsAfterEstablishingWatch, true) - verifyNoEvents(t, w) - }) - } -} - func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() backingStorage := &dummyStorage{} diff --git a/pkg/storage/testing/watcher_tests.go b/pkg/storage/testing/watcher_tests.go index eb7acf836..61fbdf607 100644 --- a/pkg/storage/testing/watcher_tests.go +++ b/pkg/storage/testing/watcher_tests.go @@ -1222,205 +1222,223 @@ func RunSendInitialEventsBackwardCompatibility(ctx context.Context, t *testing.T w.Stop() } -func TestCacherWatchSemantics(t *testing.T) { +func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interface) { trueVal, falseVal := true, false - makePod := func(rv uint64) *example.Pod { - return &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("pod-%d", rv), - Namespace: "ns", - ResourceVersion: fmt.Sprintf("%d", rv), - Annotations: map[string]string{}, + addEventsFromCreatedPods := func(createdInitialPods []*example.Pod) []watch.Event { + var ret []watch.Event + for _, createdPod := range createdInitialPods { + ret = append(ret, watch.Event{Type: watch.Added, Object: createdPod}) + } + return ret + } + initialEventsEndFromLastCreatedPod := func(createdInitialPods []*example.Pod) watch.Event { + return watch.Event{ + Type: watch.Bookmark, + Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: createdInitialPods[len(createdInitialPods)-1].ResourceVersion, + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, }, } } - scenarios := []struct { - name string - allowWatchBookmarks bool - sendInitialEvents *bool - resourceVersion string - storageResourceVersion string + name string + allowWatchBookmarks bool + sendInitialEvents *bool + resourceVersion string - initialPods []*example.Pod - podsAfterEstablishingWatch []*example.Pod + initialPods func(ns string) []*example.Pod + podsAfterEstablishingWatch func(ns string) []*example.Pod - expectedInitialEventsInStrictOrder []watch.Event - expectedInitialEventsInRandomOrder []watch.Event - expectedEventsAfterEstablishingWatch []watch.Event + expectedInitialEventsInRandomOrder func(createdInitialPods []*example.Pod) []watch.Event + expectedInitialEventsInStrictOrder func(createdInitialPods []*example.Pod) []watch.Event + expectedEventsAfterEstablishingWatch func(createdPodsAfterWatch []*example.Pod) []watch.Event }{ { - name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset, storageRV=102", + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0", allowWatchBookmarks: true, sendInitialEvents: &trueVal, - storageResourceVersion: "102", - initialPods: []*example.Pod{makePod(101)}, - podsAfterEstablishingWatch: []*example.Pod{makePod(102)}, - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}}, - expectedEventsAfterEstablishingWatch: []watch.Event{ - {Type: watch.Added, Object: makePod(102)}, - {Type: watch.Bookmark, Object: &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "102", - Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, - }, - }}, + resourceVersion: "0", + initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "2"), makePod(ns, "3")} }, + expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, + expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event { + return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)} }, }, { - name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0, storageRV=105", - allowWatchBookmarks: true, - sendInitialEvents: &trueVal, - resourceVersion: "0", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - expectedInitialEventsInRandomOrder: []watch.Event{ - {Type: watch.Added, Object: makePod(101)}, - {Type: watch.Added, Object: makePod(102)}, - }, - expectedInitialEventsInStrictOrder: []watch.Event{ - {Type: watch.Bookmark, Object: &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "102", - Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, - }, - }}, - }, - }, - { - name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=101, storageRV=105", + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=1", allowWatchBookmarks: true, sendInitialEvents: &trueVal, - resourceVersion: "101", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, - expectedInitialEventsInStrictOrder: []watch.Event{ - {Type: watch.Bookmark, Object: &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "102", - Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, - }, - }}, + resourceVersion: "1", + initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "4"), makePod(ns, "5")} }, + expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, + expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event { + return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)} }, }, { - name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset, storageRV=102", + name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset", sendInitialEvents: &trueVal, - storageResourceVersion: "102", - initialPods: []*example.Pod{makePod(101)}, - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}}, - podsAfterEstablishingWatch: []*example.Pod{makePod(102)}, - expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(102)}}, + initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "6")} }, + expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, + podsAfterEstablishingWatch: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "7")} }, + expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, { - // note we set storage's RV to some future value, mustn't be used by this scenario - name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=0, storageRV=105", + name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=0", sendInitialEvents: &trueVal, resourceVersion: "0", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, + initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "8"), makePod(ns, "9")} }, + expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, }, { - // note we set storage's RV to some future value, mustn't be used by this scenario - name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=101, storageRV=105", - sendInitialEvents: &trueVal, - resourceVersion: "101", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - // make sure we only get initial events that are > initial RV (101) - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, + name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=1", + sendInitialEvents: &trueVal, + resourceVersion: "1", + initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "10"), makePod(ns, "11")} }, + // make sure we only get initial events that are > initial RV (1) + expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, }, { - name: "sendInitialEvents=false, RV=unset, storageRV=103", - sendInitialEvents: &falseVal, - storageResourceVersion: "103", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - podsAfterEstablishingWatch: []*example.Pod{makePod(104)}, - expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(104)}}, + name: "sendInitialEvents=false, RV=unset", + sendInitialEvents: &falseVal, + initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "12"), makePod(ns, "13")} }, + podsAfterEstablishingWatch: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "14")} }, + expectedEventsAfterEstablishingWatch: func(createdPodsAfterWatch []*example.Pod) []watch.Event { + return []watch.Event{{Type: watch.Added, Object: createdPodsAfterWatch[0]}} + }, }, { - // note we set storage's RV to some future value, mustn't be used by this scenario - name: "sendInitialEvents=false, RV=0, storageRV=105", - sendInitialEvents: &falseVal, - resourceVersion: "0", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - podsAfterEstablishingWatch: []*example.Pod{makePod(103)}, - expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(103)}}, + name: "sendInitialEvents=false, RV=0", + sendInitialEvents: &falseVal, + resourceVersion: "0", + initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "15"), makePod(ns, "16")} }, + podsAfterEstablishingWatch: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "17")} }, + expectedEventsAfterEstablishingWatch: func(createdPodsAfterWatch []*example.Pod) []watch.Event { + return []watch.Event{{Type: watch.Added, Object: createdPodsAfterWatch[0]}} + }, }, { - // note we set storage's RV to some future value, mustn't be used by this scenario - name: "legacy, RV=0, storageRV=105", + name: "legacy, RV=0", resourceVersion: "0", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, + initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "18"), makePod(ns, "19")} }, + expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, }, { - // note we set storage's RV to some future value, mustn't be used by this scenario - name: "legacy, RV=unset, storageRV=105", - storageResourceVersion: "105", - initialPods: []*example.Pod{makePod(101), makePod(102)}, - // no events because the watch is delegated to the underlying storage + name: "legacy, RV=unset", + initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "20"), makePod(ns, "21")} }, + expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, }, } - for _, scenario := range scenarios { + for idx, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { // set up env defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() - storageListMetaResourceVersion := "" - backingStorage := &dummyStorage{getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error { - podList := listObj.(*example.PodList) - podList.ListMeta = metav1.ListMeta{ResourceVersion: storageListMetaResourceVersion} - return nil - }} - - cacher, _, err := newTestCacher(backingStorage) - if err != nil { - t.Fatalf("falied to create cacher: %v", err) + if scenario.expectedInitialEventsInStrictOrder == nil { + scenario.expectedInitialEventsInStrictOrder = func(_ []*example.Pod) []watch.Event { return nil } } - defer cacher.Stop() - if err := cacher.ready.wait(context.TODO()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if scenario.expectedInitialEventsInRandomOrder == nil { + scenario.expectedInitialEventsInRandomOrder = func(_ []*example.Pod) []watch.Event { return nil } + } + if scenario.podsAfterEstablishingWatch == nil { + scenario.podsAfterEstablishingWatch = func(_ string) []*example.Pod { return nil } + } + if scenario.expectedEventsAfterEstablishingWatch == nil { + scenario.expectedEventsAfterEstablishingWatch = func(_ []*example.Pod) []watch.Event { return nil } } - // now, run a scenario - // but first let's add some initial data - for _, obj := range scenario.initialPods { - err = cacher.watchCache.Add(obj) - require.NoError(t, err, "failed to add a pod: %v") + var createdPods []*example.Pod + ns := fmt.Sprintf("ns-%v", idx) + for _, obj := range scenario.initialPods(ns) { + out := &example.Pod{} + err := store.Create(ctx, computePodKey(obj), obj, out, 0) + require.NoError(t, err, "failed to add a pod: %v", obj) + createdPods = append(createdPods, out) } - // read request params - opts := storage.ListOptions{Predicate: storage.Everything} + + opts := storage.ListOptions{Predicate: storage.Everything, Recursive: true} opts.SendInitialEvents = scenario.sendInitialEvents opts.Predicate.AllowWatchBookmarks = scenario.allowWatchBookmarks if len(scenario.resourceVersion) > 0 { opts.ResourceVersion = scenario.resourceVersion } - // before starting a new watch set a storage RV to some future value - storageListMetaResourceVersion = scenario.storageResourceVersion - w, err := cacher.Watch(context.Background(), "pods/ns", opts) + w, err := store.Watch(context.Background(), fmt.Sprintf("/pods/%s", ns), opts) require.NoError(t, err, "failed to create watch: %v") defer w.Stop() // make sure we only get initial events - verifyEvents(t, w, scenario.expectedInitialEventsInRandomOrder, false) - verifyEvents(t, w, scenario.expectedInitialEventsInStrictOrder, true) - verifyNoEvents(t, w) + testCheckResultsInRandomOrder(t, w, scenario.expectedInitialEventsInRandomOrder(createdPods)) + testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEventsInStrictOrder(createdPods)) + testCheckNoMoreResults(t, w) + + createdPods = []*example.Pod{} // add a pod that is greater than the storage's RV when the watch was started - for _, obj := range scenario.podsAfterEstablishingWatch { - err = cacher.watchCache.Add(obj) + for _, obj := range scenario.podsAfterEstablishingWatch(ns) { + out := &example.Pod{} + err = store.Create(ctx, computePodKey(obj), obj, out, 0) require.NoError(t, err, "failed to add a pod: %v") + createdPods = append(createdPods, out) } - verifyEvents(t, w, scenario.expectedEventsAfterEstablishingWatch, true) - verifyNoEvents(t, w) + testCheckResultsInStrictOrder(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods)) + testCheckNoMoreResults(t, w) }) } } +// RunWatchSemanticInitialEventsExtended checks if the bookmark event marking the end of the list stream contains the global RV +func RunWatchSemanticInitialEventsExtended(ctx context.Context, t *testing.T, store storage.Interface) { + trueVal := true + initialPod := func(ns string) *example.Pod { return makePod(ns, "2") } + expectedInitialEventsInStrictOrder := func(firstPod, secondPod *example.Pod) []watch.Event { + return []watch.Event{ + {Type: watch.Added, Object: firstPod}, + {Type: watch.Bookmark, Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: secondPod.ResourceVersion, + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + } + } + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() + + firstPod := &example.Pod{} + nsPrefix := "foo" + ns := fmt.Sprintf("ns-%s", nsPrefix) + err := store.Create(ctx, computePodKey(initialPod(ns)), initialPod(ns), firstPod, 0) + require.NoError(t, err, "failed to add a pod: %v") + + // add the pod to a different ns to advance the global RV + secondPod := &example.Pod{} + newNs := fmt.Sprintf("other-ns-%s", nsPrefix) + err = store.Create(ctx, computePodKey(initialPod(newNs)), initialPod(newNs), secondPod, 0) + require.NoError(t, err, "failed to add a pod: %v") + + opts := storage.ListOptions{Predicate: storage.Everything, Recursive: true} + opts.SendInitialEvents = &trueVal + opts.Predicate.AllowWatchBookmarks = true + + w, err := store.Watch(context.Background(), fmt.Sprintf("/pods/%s", ns), opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + + // make sure we only get initial events from the first ns + // followed by the bookmark with the global RV + testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(firstPod, secondPod)) + testCheckNoMoreResults(t, w) +} + +func makePod(namespace, namePrefix string) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%s", namePrefix), + Namespace: namespace, + }, + } +} + type testWatchStruct struct { obj *example.Pod expectEvent bool