storage/testing/watcher_tests: make TestCacherWatchSemantics storage agnostic

Kubernetes-commit: 91bb75883c613d45563f3b7c01a69dde8194bfdc
This commit is contained in:
Lukasz Szaszkiewicz 2023-09-15 12:28:36 +02:00 committed by Kubernetes Publisher
parent c13e210d56
commit c8dcfeed34
3 changed files with 202 additions and 346 deletions

View File

@ -22,6 +22,7 @@ import (
"testing" "testing"
"time" "time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
@ -327,6 +328,18 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store) storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
} }
func TestCacherWatchSemantics(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
}
func TestCacherWatchSemanticInitialEventsExtended(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemanticInitialEventsExtended(context.TODO(), t, store)
}
// =================================================== // ===================================================
// Test-setup related function are following. // Test-setup related function are following.
// =================================================== // ===================================================
@ -424,3 +437,36 @@ func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context
return ctx, cacher, server, terminate return ctx, cacher, server, terminate
} }
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
_, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
if err := cacher.ready.wait(context.TODO()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
return &createWrapper{Cacher: cacher}, tearDown
}
type createWrapper struct {
*Cacher
}
func (c *createWrapper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if err := c.Cacher.Create(ctx, key, obj, out, ttl); err != nil {
return err
}
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
currentObj := c.Cacher.newFunc()
err := c.Cacher.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj)
if err != nil {
if storage.IsNotFound(err) {
return false, nil
}
return false, err
}
if !apiequality.Semantic.DeepEqual(currentObj, out) {
return false, nil
}
return true, nil
})
}

View File

@ -1340,15 +1340,6 @@ func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event, strictO
} }
} }
func verifyNoEvents(t *testing.T, w watch.Interface) {
select {
case e := <-w.ResultChan():
t.Errorf("Unexpected: %#v event received, expected no events", e)
case <-time.After(time.Second):
return
}
}
func TestCachingDeleteEvents(t *testing.T) { func TestCachingDeleteEvents(t *testing.T) {
backingStorage := &dummyStorage{} backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage) cacher, _, err := newTestCacher(backingStorage)
@ -1611,205 +1602,6 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
} }
} }
func TestCacherWatchSemantics(t *testing.T) {
trueVal, falseVal := true, false
makePod := func(rv uint64) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", rv),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", rv),
Annotations: map[string]string{},
},
}
}
scenarios := []struct {
name string
allowWatchBookmarks bool
sendInitialEvents *bool
resourceVersion string
storageResourceVersion string
initialPods []*example.Pod
podsAfterEstablishingWatch []*example.Pod
expectedInitialEventsInStrictOrder []watch.Event
expectedInitialEventsInRandomOrder []watch.Event
expectedEventsAfterEstablishingWatch []watch.Event
}{
{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset, storageRV=102",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
storageResourceVersion: "102",
initialPods: []*example.Pod{makePod(101)},
podsAfterEstablishingWatch: []*example.Pod{makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}},
expectedEventsAfterEstablishingWatch: []watch.Event{
{Type: watch.Added, Object: makePod(102)},
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
},
},
{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0, storageRV=105",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
resourceVersion: "0",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{
{Type: watch.Added, Object: makePod(101)},
{Type: watch.Added, Object: makePod(102)},
},
expectedInitialEventsInStrictOrder: []watch.Event{
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
},
},
{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=101, storageRV=105",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
resourceVersion: "101",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
expectedInitialEventsInStrictOrder: []watch.Event{
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
},
},
{
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset, storageRV=102",
sendInitialEvents: &trueVal,
storageResourceVersion: "102",
initialPods: []*example.Pod{makePod(101)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}},
podsAfterEstablishingWatch: []*example.Pod{makePod(102)},
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
},
{
// note we set storage's RV to some future value, mustn't be used by this scenario
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=0, storageRV=105",
sendInitialEvents: &trueVal,
resourceVersion: "0",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
},
{
// note we set storage's RV to some future value, mustn't be used by this scenario
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=101, storageRV=105",
sendInitialEvents: &trueVal,
resourceVersion: "101",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
// make sure we only get initial events that are > initial RV (101)
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
},
{
name: "sendInitialEvents=false, RV=unset, storageRV=103",
sendInitialEvents: &falseVal,
storageResourceVersion: "103",
initialPods: []*example.Pod{makePod(101), makePod(102)},
podsAfterEstablishingWatch: []*example.Pod{makePod(104)},
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(104)}},
},
{
// note we set storage's RV to some future value, mustn't be used by this scenario
name: "sendInitialEvents=false, RV=0, storageRV=105",
sendInitialEvents: &falseVal,
resourceVersion: "0",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
podsAfterEstablishingWatch: []*example.Pod{makePod(103)},
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(103)}},
},
{
// note we set storage's RV to some future value, mustn't be used by this scenario
name: "legacy, RV=0, storageRV=105",
resourceVersion: "0",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
},
{
// note we set storage's RV to some future value, mustn't be used by this scenario
name: "legacy, RV=unset, storageRV=105",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
// no events because the watch is delegated to the underlying storage
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
// set up env
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
storageListMetaResourceVersion := ""
backingStorage := &dummyStorage{getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
podList := listObj.(*example.PodList)
podList.ListMeta = metav1.ListMeta{ResourceVersion: storageListMetaResourceVersion}
return nil
}}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("falied to create cacher: %v", err)
}
defer cacher.Stop()
if err := cacher.ready.wait(context.TODO()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// now, run a scenario
// but first let's add some initial data
for _, obj := range scenario.initialPods {
err = cacher.watchCache.Add(obj)
require.NoError(t, err, "failed to add a pod: %v")
}
// read request params
opts := storage.ListOptions{Predicate: storage.Everything}
opts.SendInitialEvents = scenario.sendInitialEvents
opts.Predicate.AllowWatchBookmarks = scenario.allowWatchBookmarks
if len(scenario.resourceVersion) > 0 {
opts.ResourceVersion = scenario.resourceVersion
}
// before starting a new watch set a storage RV to some future value
storageListMetaResourceVersion = scenario.storageResourceVersion
w, err := cacher.Watch(context.Background(), "pods/ns", opts)
require.NoError(t, err, "failed to create watch: %v")
defer w.Stop()
// make sure we only get initial events
verifyEvents(t, w, scenario.expectedInitialEventsInRandomOrder, false)
verifyEvents(t, w, scenario.expectedInitialEventsInStrictOrder, true)
verifyNoEvents(t, w)
// add a pod that is greater than the storage's RV when the watch was started
for _, obj := range scenario.podsAfterEstablishingWatch {
err = cacher.watchCache.Add(obj)
require.NoError(t, err, "failed to add a pod: %v")
}
verifyEvents(t, w, scenario.expectedEventsAfterEstablishingWatch, true)
verifyNoEvents(t, w)
})
}
}
func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
backingStorage := &dummyStorage{} backingStorage := &dummyStorage{}

View File

@ -1222,205 +1222,223 @@ func RunSendInitialEventsBackwardCompatibility(ctx context.Context, t *testing.T
w.Stop() w.Stop()
} }
func TestCacherWatchSemantics(t *testing.T) { func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interface) {
trueVal, falseVal := true, false trueVal, falseVal := true, false
makePod := func(rv uint64) *example.Pod { addEventsFromCreatedPods := func(createdInitialPods []*example.Pod) []watch.Event {
return &example.Pod{ var ret []watch.Event
for _, createdPod := range createdInitialPods {
ret = append(ret, watch.Event{Type: watch.Added, Object: createdPod})
}
return ret
}
initialEventsEndFromLastCreatedPod := func(createdInitialPods []*example.Pod) watch.Event {
return watch.Event{
Type: watch.Bookmark,
Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", rv), ResourceVersion: createdInitialPods[len(createdInitialPods)-1].ResourceVersion,
Namespace: "ns", Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
ResourceVersion: fmt.Sprintf("%d", rv), },
Annotations: map[string]string{},
}, },
} }
} }
scenarios := []struct { scenarios := []struct {
name string name string
allowWatchBookmarks bool allowWatchBookmarks bool
sendInitialEvents *bool sendInitialEvents *bool
resourceVersion string resourceVersion string
storageResourceVersion string
initialPods []*example.Pod initialPods func(ns string) []*example.Pod
podsAfterEstablishingWatch []*example.Pod podsAfterEstablishingWatch func(ns string) []*example.Pod
expectedInitialEventsInStrictOrder []watch.Event expectedInitialEventsInRandomOrder func(createdInitialPods []*example.Pod) []watch.Event
expectedInitialEventsInRandomOrder []watch.Event expectedInitialEventsInStrictOrder func(createdInitialPods []*example.Pod) []watch.Event
expectedEventsAfterEstablishingWatch []watch.Event expectedEventsAfterEstablishingWatch func(createdPodsAfterWatch []*example.Pod) []watch.Event
}{ }{
{ {
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset, storageRV=102", name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
storageResourceVersion: "102",
initialPods: []*example.Pod{makePod(101)},
podsAfterEstablishingWatch: []*example.Pod{makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}},
expectedEventsAfterEstablishingWatch: []watch.Event{
{Type: watch.Added, Object: makePod(102)},
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
},
},
{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0, storageRV=105",
allowWatchBookmarks: true, allowWatchBookmarks: true,
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
resourceVersion: "0", resourceVersion: "0",
storageResourceVersion: "105", initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "2"), makePod(ns, "3")} },
initialPods: []*example.Pod{makePod(101), makePod(102)}, expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInRandomOrder: []watch.Event{ expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event {
{Type: watch.Added, Object: makePod(101)}, return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
{Type: watch.Added, Object: makePod(102)},
},
expectedInitialEventsInStrictOrder: []watch.Event{
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
}, },
}, },
{ {
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=101, storageRV=105", name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=1",
allowWatchBookmarks: true, allowWatchBookmarks: true,
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
resourceVersion: "101", resourceVersion: "1",
storageResourceVersion: "105", initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "4"), makePod(ns, "5")} },
initialPods: []*example.Pod{makePod(101), makePod(102)}, expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event {
expectedInitialEventsInStrictOrder: []watch.Event{ return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
}, },
}, },
{ {
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset, storageRV=102", name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset",
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
storageResourceVersion: "102", initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "6")} },
initialPods: []*example.Pod{makePod(101)}, expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}}, podsAfterEstablishingWatch: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "7")} },
podsAfterEstablishingWatch: []*example.Pod{makePod(102)}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
}, },
{ {
// note we set storage's RV to some future value, mustn't be used by this scenario name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=0",
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=0, storageRV=105",
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
resourceVersion: "0", resourceVersion: "0",
storageResourceVersion: "105", initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "8"), makePod(ns, "9")} },
initialPods: []*example.Pod{makePod(101), makePod(102)}, expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
}, },
{ {
// note we set storage's RV to some future value, mustn't be used by this scenario name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=1",
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=101, storageRV=105",
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
resourceVersion: "101", resourceVersion: "1",
storageResourceVersion: "105", initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "10"), makePod(ns, "11")} },
initialPods: []*example.Pod{makePod(101), makePod(102)}, // make sure we only get initial events that are > initial RV (1)
// make sure we only get initial events that are > initial RV (101) expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
}, },
{ {
name: "sendInitialEvents=false, RV=unset, storageRV=103", name: "sendInitialEvents=false, RV=unset",
sendInitialEvents: &falseVal, sendInitialEvents: &falseVal,
storageResourceVersion: "103", initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "12"), makePod(ns, "13")} },
initialPods: []*example.Pod{makePod(101), makePod(102)}, podsAfterEstablishingWatch: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "14")} },
podsAfterEstablishingWatch: []*example.Pod{makePod(104)}, expectedEventsAfterEstablishingWatch: func(createdPodsAfterWatch []*example.Pod) []watch.Event {
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(104)}}, return []watch.Event{{Type: watch.Added, Object: createdPodsAfterWatch[0]}}
},
}, },
{ {
// note we set storage's RV to some future value, mustn't be used by this scenario name: "sendInitialEvents=false, RV=0",
name: "sendInitialEvents=false, RV=0, storageRV=105",
sendInitialEvents: &falseVal, sendInitialEvents: &falseVal,
resourceVersion: "0", resourceVersion: "0",
storageResourceVersion: "105", initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "15"), makePod(ns, "16")} },
initialPods: []*example.Pod{makePod(101), makePod(102)}, podsAfterEstablishingWatch: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "17")} },
podsAfterEstablishingWatch: []*example.Pod{makePod(103)}, expectedEventsAfterEstablishingWatch: func(createdPodsAfterWatch []*example.Pod) []watch.Event {
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(103)}}, return []watch.Event{{Type: watch.Added, Object: createdPodsAfterWatch[0]}}
},
}, },
{ {
// note we set storage's RV to some future value, mustn't be used by this scenario name: "legacy, RV=0",
name: "legacy, RV=0, storageRV=105",
resourceVersion: "0", resourceVersion: "0",
storageResourceVersion: "105", initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "18"), makePod(ns, "19")} },
initialPods: []*example.Pod{makePod(101), makePod(102)}, expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
}, },
{ {
// note we set storage's RV to some future value, mustn't be used by this scenario name: "legacy, RV=unset",
name: "legacy, RV=unset, storageRV=105", initialPods: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "20"), makePod(ns, "21")} },
storageResourceVersion: "105", expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
initialPods: []*example.Pod{makePod(101), makePod(102)},
// no events because the watch is delegated to the underlying storage
}, },
} }
for _, scenario := range scenarios { for idx, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) { t.Run(scenario.name, func(t *testing.T) {
// set up env // set up env
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
storageListMetaResourceVersion := "" if scenario.expectedInitialEventsInStrictOrder == nil {
backingStorage := &dummyStorage{getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error { scenario.expectedInitialEventsInStrictOrder = func(_ []*example.Pod) []watch.Event { return nil }
podList := listObj.(*example.PodList)
podList.ListMeta = metav1.ListMeta{ResourceVersion: storageListMetaResourceVersion}
return nil
}}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("falied to create cacher: %v", err)
} }
defer cacher.Stop() if scenario.expectedInitialEventsInRandomOrder == nil {
if err := cacher.ready.wait(context.TODO()); err != nil { scenario.expectedInitialEventsInRandomOrder = func(_ []*example.Pod) []watch.Event { return nil }
t.Fatalf("unexpected error waiting for the cache to be ready") }
if scenario.podsAfterEstablishingWatch == nil {
scenario.podsAfterEstablishingWatch = func(_ string) []*example.Pod { return nil }
}
if scenario.expectedEventsAfterEstablishingWatch == nil {
scenario.expectedEventsAfterEstablishingWatch = func(_ []*example.Pod) []watch.Event { return nil }
} }
// now, run a scenario var createdPods []*example.Pod
// but first let's add some initial data ns := fmt.Sprintf("ns-%v", idx)
for _, obj := range scenario.initialPods { for _, obj := range scenario.initialPods(ns) {
err = cacher.watchCache.Add(obj) out := &example.Pod{}
require.NoError(t, err, "failed to add a pod: %v") err := store.Create(ctx, computePodKey(obj), obj, out, 0)
require.NoError(t, err, "failed to add a pod: %v", obj)
createdPods = append(createdPods, out)
} }
// read request params
opts := storage.ListOptions{Predicate: storage.Everything} opts := storage.ListOptions{Predicate: storage.Everything, Recursive: true}
opts.SendInitialEvents = scenario.sendInitialEvents opts.SendInitialEvents = scenario.sendInitialEvents
opts.Predicate.AllowWatchBookmarks = scenario.allowWatchBookmarks opts.Predicate.AllowWatchBookmarks = scenario.allowWatchBookmarks
if len(scenario.resourceVersion) > 0 { if len(scenario.resourceVersion) > 0 {
opts.ResourceVersion = scenario.resourceVersion opts.ResourceVersion = scenario.resourceVersion
} }
// before starting a new watch set a storage RV to some future value
storageListMetaResourceVersion = scenario.storageResourceVersion
w, err := cacher.Watch(context.Background(), "pods/ns", opts) w, err := store.Watch(context.Background(), fmt.Sprintf("/pods/%s", ns), opts)
require.NoError(t, err, "failed to create watch: %v") require.NoError(t, err, "failed to create watch: %v")
defer w.Stop() defer w.Stop()
// make sure we only get initial events // make sure we only get initial events
verifyEvents(t, w, scenario.expectedInitialEventsInRandomOrder, false) testCheckResultsInRandomOrder(t, w, scenario.expectedInitialEventsInRandomOrder(createdPods))
verifyEvents(t, w, scenario.expectedInitialEventsInStrictOrder, true) testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEventsInStrictOrder(createdPods))
verifyNoEvents(t, w) testCheckNoMoreResults(t, w)
createdPods = []*example.Pod{}
// add a pod that is greater than the storage's RV when the watch was started // add a pod that is greater than the storage's RV when the watch was started
for _, obj := range scenario.podsAfterEstablishingWatch { for _, obj := range scenario.podsAfterEstablishingWatch(ns) {
err = cacher.watchCache.Add(obj) out := &example.Pod{}
err = store.Create(ctx, computePodKey(obj), obj, out, 0)
require.NoError(t, err, "failed to add a pod: %v") require.NoError(t, err, "failed to add a pod: %v")
createdPods = append(createdPods, out)
} }
verifyEvents(t, w, scenario.expectedEventsAfterEstablishingWatch, true) testCheckResultsInStrictOrder(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods))
verifyNoEvents(t, w) testCheckNoMoreResults(t, w)
}) })
} }
} }
// RunWatchSemanticInitialEventsExtended checks if the bookmark event marking the end of the list stream contains the global RV
func RunWatchSemanticInitialEventsExtended(ctx context.Context, t *testing.T, store storage.Interface) {
trueVal := true
initialPod := func(ns string) *example.Pod { return makePod(ns, "2") }
expectedInitialEventsInStrictOrder := func(firstPod, secondPod *example.Pod) []watch.Event {
return []watch.Event{
{Type: watch.Added, Object: firstPod},
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: secondPod.ResourceVersion,
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
}
}
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
firstPod := &example.Pod{}
nsPrefix := "foo"
ns := fmt.Sprintf("ns-%s", nsPrefix)
err := store.Create(ctx, computePodKey(initialPod(ns)), initialPod(ns), firstPod, 0)
require.NoError(t, err, "failed to add a pod: %v")
// add the pod to a different ns to advance the global RV
secondPod := &example.Pod{}
newNs := fmt.Sprintf("other-ns-%s", nsPrefix)
err = store.Create(ctx, computePodKey(initialPod(newNs)), initialPod(newNs), secondPod, 0)
require.NoError(t, err, "failed to add a pod: %v")
opts := storage.ListOptions{Predicate: storage.Everything, Recursive: true}
opts.SendInitialEvents = &trueVal
opts.Predicate.AllowWatchBookmarks = true
w, err := store.Watch(context.Background(), fmt.Sprintf("/pods/%s", ns), opts)
require.NoError(t, err, "failed to create watch: %v")
defer w.Stop()
// make sure we only get initial events from the first ns
// followed by the bookmark with the global RV
testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(firstPod, secondPod))
testCheckNoMoreResults(t, w)
}
func makePod(namespace, namePrefix string) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%s", namePrefix),
Namespace: namespace,
},
}
}
type testWatchStruct struct { type testWatchStruct struct {
obj *example.Pod obj *example.Pod
expectEvent bool expectEvent bool