storage/testing/watcher_tests: move TestCacherWatchSemantics (no-op)
Kubernetes-commit: afbb1a6ef98b548b9e57b168614ca6e15fd0034c
This commit is contained in:
parent
3c3bb86696
commit
c13e210d56
|
@ -36,9 +36,12 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
"k8s.io/apiserver/pkg/features"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/value"
|
"k8s.io/apiserver/pkg/storage/value"
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
"k8s.io/utils/pointer"
|
"k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1219,6 +1222,205 @@ func RunSendInitialEventsBackwardCompatibility(ctx context.Context, t *testing.T
|
||||||
w.Stop()
|
w.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCacherWatchSemantics(t *testing.T) {
|
||||||
|
trueVal, falseVal := true, false
|
||||||
|
makePod := func(rv uint64) *example.Pod {
|
||||||
|
return &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("pod-%d", rv),
|
||||||
|
Namespace: "ns",
|
||||||
|
ResourceVersion: fmt.Sprintf("%d", rv),
|
||||||
|
Annotations: map[string]string{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
scenarios := []struct {
|
||||||
|
name string
|
||||||
|
allowWatchBookmarks bool
|
||||||
|
sendInitialEvents *bool
|
||||||
|
resourceVersion string
|
||||||
|
storageResourceVersion string
|
||||||
|
|
||||||
|
initialPods []*example.Pod
|
||||||
|
podsAfterEstablishingWatch []*example.Pod
|
||||||
|
|
||||||
|
expectedInitialEventsInStrictOrder []watch.Event
|
||||||
|
expectedInitialEventsInRandomOrder []watch.Event
|
||||||
|
expectedEventsAfterEstablishingWatch []watch.Event
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset, storageRV=102",
|
||||||
|
allowWatchBookmarks: true,
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
storageResourceVersion: "102",
|
||||||
|
initialPods: []*example.Pod{makePod(101)},
|
||||||
|
podsAfterEstablishingWatch: []*example.Pod{makePod(102)},
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}},
|
||||||
|
expectedEventsAfterEstablishingWatch: []watch.Event{
|
||||||
|
{Type: watch.Added, Object: makePod(102)},
|
||||||
|
{Type: watch.Bookmark, Object: &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: "102",
|
||||||
|
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0, storageRV=105",
|
||||||
|
allowWatchBookmarks: true,
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
resourceVersion: "0",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{
|
||||||
|
{Type: watch.Added, Object: makePod(101)},
|
||||||
|
{Type: watch.Added, Object: makePod(102)},
|
||||||
|
},
|
||||||
|
expectedInitialEventsInStrictOrder: []watch.Event{
|
||||||
|
{Type: watch.Bookmark, Object: &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: "102",
|
||||||
|
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=101, storageRV=105",
|
||||||
|
allowWatchBookmarks: true,
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
resourceVersion: "101",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
|
||||||
|
expectedInitialEventsInStrictOrder: []watch.Event{
|
||||||
|
{Type: watch.Bookmark, Object: &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: "102",
|
||||||
|
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset, storageRV=102",
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
storageResourceVersion: "102",
|
||||||
|
initialPods: []*example.Pod{makePod(101)},
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}},
|
||||||
|
podsAfterEstablishingWatch: []*example.Pod{makePod(102)},
|
||||||
|
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// note we set storage's RV to some future value, mustn't be used by this scenario
|
||||||
|
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=0, storageRV=105",
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
resourceVersion: "0",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// note we set storage's RV to some future value, mustn't be used by this scenario
|
||||||
|
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=101, storageRV=105",
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
resourceVersion: "101",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
// make sure we only get initial events that are > initial RV (101)
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "sendInitialEvents=false, RV=unset, storageRV=103",
|
||||||
|
sendInitialEvents: &falseVal,
|
||||||
|
storageResourceVersion: "103",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
podsAfterEstablishingWatch: []*example.Pod{makePod(104)},
|
||||||
|
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(104)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// note we set storage's RV to some future value, mustn't be used by this scenario
|
||||||
|
name: "sendInitialEvents=false, RV=0, storageRV=105",
|
||||||
|
sendInitialEvents: &falseVal,
|
||||||
|
resourceVersion: "0",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
podsAfterEstablishingWatch: []*example.Pod{makePod(103)},
|
||||||
|
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(103)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// note we set storage's RV to some future value, mustn't be used by this scenario
|
||||||
|
name: "legacy, RV=0, storageRV=105",
|
||||||
|
resourceVersion: "0",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// note we set storage's RV to some future value, mustn't be used by this scenario
|
||||||
|
name: "legacy, RV=unset, storageRV=105",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
// no events because the watch is delegated to the underlying storage
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, scenario := range scenarios {
|
||||||
|
t.Run(scenario.name, func(t *testing.T) {
|
||||||
|
// set up env
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
|
||||||
|
storageListMetaResourceVersion := ""
|
||||||
|
backingStorage := &dummyStorage{getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
|
||||||
|
podList := listObj.(*example.PodList)
|
||||||
|
podList.ListMeta = metav1.ListMeta{ResourceVersion: storageListMetaResourceVersion}
|
||||||
|
return nil
|
||||||
|
}}
|
||||||
|
|
||||||
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("falied to create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
if err := cacher.ready.wait(context.TODO()); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
|
// now, run a scenario
|
||||||
|
// but first let's add some initial data
|
||||||
|
for _, obj := range scenario.initialPods {
|
||||||
|
err = cacher.watchCache.Add(obj)
|
||||||
|
require.NoError(t, err, "failed to add a pod: %v")
|
||||||
|
}
|
||||||
|
// read request params
|
||||||
|
opts := storage.ListOptions{Predicate: storage.Everything}
|
||||||
|
opts.SendInitialEvents = scenario.sendInitialEvents
|
||||||
|
opts.Predicate.AllowWatchBookmarks = scenario.allowWatchBookmarks
|
||||||
|
if len(scenario.resourceVersion) > 0 {
|
||||||
|
opts.ResourceVersion = scenario.resourceVersion
|
||||||
|
}
|
||||||
|
// before starting a new watch set a storage RV to some future value
|
||||||
|
storageListMetaResourceVersion = scenario.storageResourceVersion
|
||||||
|
|
||||||
|
w, err := cacher.Watch(context.Background(), "pods/ns", opts)
|
||||||
|
require.NoError(t, err, "failed to create watch: %v")
|
||||||
|
defer w.Stop()
|
||||||
|
|
||||||
|
// make sure we only get initial events
|
||||||
|
verifyEvents(t, w, scenario.expectedInitialEventsInRandomOrder, false)
|
||||||
|
verifyEvents(t, w, scenario.expectedInitialEventsInStrictOrder, true)
|
||||||
|
verifyNoEvents(t, w)
|
||||||
|
// add a pod that is greater than the storage's RV when the watch was started
|
||||||
|
for _, obj := range scenario.podsAfterEstablishingWatch {
|
||||||
|
err = cacher.watchCache.Add(obj)
|
||||||
|
require.NoError(t, err, "failed to add a pod: %v")
|
||||||
|
}
|
||||||
|
verifyEvents(t, w, scenario.expectedEventsAfterEstablishingWatch, true)
|
||||||
|
verifyNoEvents(t, w)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type testWatchStruct struct {
|
type testWatchStruct struct {
|
||||||
obj *example.Pod
|
obj *example.Pod
|
||||||
expectEvent bool
|
expectEvent bool
|
||||||
|
|
Loading…
Reference in New Issue