Generalize few watch tests from cacher

Kubernetes-commit: 45e836a968acc113cb03768cd8c730bea89bd332
This commit is contained in:
Wojciech Tyczyński 2023-04-17 21:48:08 +02:00 committed by Kubernetes Publisher
parent 9f0e077465
commit 072d278e39
3 changed files with 182 additions and 162 deletions

View File

@ -61,6 +61,11 @@ func TestWatchFromNoneZero(t *testing.T) {
storagetesting.RunTestWatchFromNoneZero(ctx, t, store)
}
func TestDelayedWatchDelivery(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestDelayedWatchDelivery(ctx, t, store)
}
func TestWatchError(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchError(ctx, t, &storeWithPrefixTransformer{store})
@ -71,6 +76,11 @@ func TestWatchContextCancel(t *testing.T) {
storagetesting.RunTestWatchContextCancel(ctx, t, store)
}
func TestWatcherTimeout(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatcherTimeout(ctx, t, store)
}
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store)

View File

@ -54,6 +54,12 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur
Spec: example.PodSpec{NodeName: "bar"},
}
selectedPod := func(pod *example.Pod) *example.Pod {
result := pod.DeepCopy()
result.Labels = map[string]string{"select": "true"}
return result
}
tests := []struct {
name string
namespace string
@ -94,6 +100,24 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur
return nil, fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil
},
},
}, {
name: "filtering",
namespace: fmt.Sprintf("test-ns-5-%t", recursive),
watchTests: []*testWatchStruct{
{selectedPod(basePod), true, watch.Added},
{basePod, true, watch.Deleted},
{selectedPod(basePod), true, watch.Added},
{selectedPod(basePodAssigned), true, watch.Modified},
{nil, true, watch.Deleted},
},
pred: storage.SelectionPredicate{
Label: labels.SelectorFromSet(labels.Set{"select": "true"}),
Field: fields.Everything(),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*example.Pod)
return labels.Set(pod.Labels), nil, nil
},
},
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -113,17 +137,39 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
// Create a pod in a different namespace first to ensure
// that its corresponding event will not be propagated.
badKey := fmt.Sprintf("/pods/%s-bad/foo", tt.namespace)
badOut := &example.Pod{}
err = store.GuaranteedUpdate(ctx, badKey, badOut, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
obj := basePod.DeepCopy()
obj.Namespace = fmt.Sprintf("%s-bad", tt.namespace)
return obj, nil
}), nil)
if err != nil {
t.Fatalf("GuaranteedUpdate of bad pod failed: %v", err)
}
var prevObj *example.Pod
for _, watchTest := range tt.watchTests {
out := &example.Pod{}
err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
obj := watchTest.obj.DeepCopy()
obj.Namespace = tt.namespace
return obj, nil
}), nil)
if err != nil {
t.Fatalf("GuaranteedUpdate failed: %v", err)
if watchTest.obj != nil {
err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
obj := watchTest.obj.DeepCopy()
obj.Namespace = tt.namespace
return obj, nil
}), nil)
if err != nil {
t.Fatalf("GuaranteedUpdate failed: %v", err)
}
} else {
err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, nil)
if err != nil {
t.Fatalf("Delete failed: %v", err)
}
}
if watchTest.expectEvent {
expectObj := out
@ -226,6 +272,60 @@ func RunTestWatchFromNoneZero(ctx context.Context, t *testing.T, store storage.I
testCheckResult(t, watch.Modified, w, out)
}
func RunTestDelayedWatchDelivery(ctx context.Context, t *testing.T, store storage.Interface) {
_, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}})
startRV := storedObj.ResourceVersion
watcher, err := store.Watch(ctx, "/pods/test-ns", storage.ListOptions{ResourceVersion: startRV, Predicate: storage.Everything, Recursive: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Depending on the implementation, different number of events that
// should be delivered to the watcher can be created before it will
// block the implementation and as a result force the watcher to be
// closed (as otherwise events would have to be dropped).
// For now, this number is smallest for Cacher and it equals 21 for it.
totalPods := 21
for i := 0; i < totalPods; i++ {
out := &example.Pod{}
pod := &example.Pod{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i), Namespace: "test-ns"},
}
err := store.GuaranteedUpdate(ctx, computePodKey(pod), out, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
return pod, nil
}), nil)
if err != nil {
t.Errorf("GuaranteedUpdate failed: %v", err)
}
}
// Now stop the watcher and check if the consecutive events are being delivered.
watcher.Stop()
watched := 0
for {
event, ok := <-watcher.ResultChan()
if !ok {
break
}
object := event.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
if a, e := object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a {
t.Errorf("Unexpected object watched: %s, expected %s", a, e)
}
watched++
}
// We expect at least N events to be delivered, depending on the implementation.
// For now, this number is smallest for Cacher and it equals 10 (size of the out buffer).
if watched < 10 {
t.Errorf("Unexpected number of events: %v, expected: %v", watched, totalPods)
}
}
func RunTestWatchError(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}
key := computePodKey(obj)
@ -285,6 +385,63 @@ func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage.
}
}
func RunTestWatcherTimeout(ctx context.Context, t *testing.T, store storage.Interface) {
// initialRV is used to initate the watcher at the beginning of the world.
podList := example.PodList{}
options := storage.ListOptions{
Predicate: storage.Everything,
Recursive: true,
}
if err := store.GetList(ctx, "/pods", options, &podList); err != nil {
t.Fatalf("Failed to list pods: %v", err)
}
initialRV := podList.ResourceVersion
options = storage.ListOptions{
ResourceVersion: initialRV,
Predicate: storage.Everything,
Recursive: true,
}
// Create a number of watchers that will not be reading any result.
nonReadingWatchers := 50
for i := 0; i < nonReadingWatchers; i++ {
watcher, err := store.Watch(ctx, "/pods/test-ns", options)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
}
// Create a second watcher that will be reading result.
readingWatcher, err := store.Watch(ctx, "/pods/test-ns", options)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer readingWatcher.Stop()
// Depending on the implementation, different number of events that
// should be delivered to the watcher can be created before it will
// block the implementation and as a result force the watcher to be
// closed (as otherwise events would have to be dropped).
// For now, this number is smallest for Cacher and it equals 21 for it.
//
// Create more events to ensure that we're not blocking other watchers
// forever.
startTime := time.Now()
for i := 0; i < 22; i++ {
out := &example.Pod{}
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i), Namespace: "test-ns"}}
if err := store.Create(ctx, computePodKey(pod), pod, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
testCheckResult(t, watch.Added, readingWatcher, out)
}
if time.Since(startTime) > time.Duration(250*nonReadingWatchers)*time.Millisecond {
t.Errorf("waiting for events took too long: %v", time.Since(startTime))
}
}
func RunTestWatchDeleteEventObjectHaveLatestRV(ctx context.Context, t *testing.T, store storage.Interface) {
key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}})

View File

@ -28,7 +28,6 @@ import (
"k8s.io/apimachinery/pkg/api/apitesting"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
@ -310,113 +309,9 @@ func TestWatchDeprecated(t *testing.T) {
}
func TestWatcherTimeout(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, _, err := newTestCacher(etcdStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// initialVersion is used to initate the watcher at the beginning of the world,
// which is not defined precisely in etcd.
initialVersion, err := cacher.LastSyncResourceVersion()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
startVersion := strconv.Itoa(int(initialVersion))
// Create a number of watchers that will not be reading any result.
nonReadingWatchers := 50
for i := 0; i < nonReadingWatchers; i++ {
watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything, Recursive: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
}
// Create a second watcher that will be reading result.
readingWatcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything, Recursive: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer readingWatcher.Stop()
startTime := time.Now()
for i := 1; i <= 22; i++ {
pod := makeTestPod(strconv.Itoa(i))
_ = updatePod(t, etcdStorage, pod, nil)
verifyWatchEvent(t, readingWatcher, watch.Added, pod)
}
if time.Since(startTime) > time.Duration(250*nonReadingWatchers)*time.Millisecond {
t.Errorf("waiting for events took too long: %v", time.Since(startTime))
}
}
func TestFiltering(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, _, err := newTestCacher(etcdStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Ensure that the cacher is initialized, before creating any pods,
// so that we are sure that all events will be present in cacher.
syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
syncWatcher.Stop()
podFoo := makeTestPod("foo")
podFoo.Labels = map[string]string{"filter": "foo"}
podFooFiltered := makeTestPod("foo")
podFooPrime := makeTestPod("foo")
podFooPrime.Labels = map[string]string{"filter": "foo"}
podFooPrime.Spec.NodeName = "fakeNode"
podFooNS2 := makeTestPod("foo")
podFooNS2.Namespace += "2"
podFooNS2.Labels = map[string]string{"filter": "foo"}
// Create in another namespace first to make sure events from other namespaces don't get delivered
updatePod(t, etcdStorage, podFooNS2, nil)
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated)
fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)
_ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered)
deleted := example.Pod{}
if err := etcdStorage.Delete(context.TODO(), "pods/ns/foo", &deleted, nil, storage.ValidateAllObjectFunc, nil); err != nil {
t.Errorf("Unexpected error: %v", err)
}
// Set up Watch for object "podFoo" with label filter set.
pred := storage.SelectionPredicate{
Label: labels.SelectorFromSet(labels.Set{"filter": "foo"}),
Field: fields.Everything(),
GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) {
metadata, err := meta.Accessor(obj)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return labels.Set(metadata.GetLabels()), nil, nil
},
}
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: pred})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered)
verifyWatchEvent(t, watcher, watch.Added, podFoo)
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatcherTimeout(ctx, t, cacher)
}
func TestEmptyWatchEventCache(t *testing.T) {
@ -486,52 +381,10 @@ func TestEmptyWatchEventCache(t *testing.T) {
}
}
func TestRandomWatchDeliver(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, v, err := newTestCacher(etcdStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
startVersion := strconv.Itoa(int(rv))
watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything, Recursive: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now we can create exactly 21 events that should be delivered
// to the watcher, before it will completely block cacher and as
// a result will be dropped.
for i := 0; i < 21; i++ {
updatePod(t, etcdStorage, makeTestPod(fmt.Sprintf("foo-%d", i)), nil)
}
// Now stop the watcher and check if the consecutive events are being delivered.
watcher.Stop()
watched := 0
for {
event, ok := <-watcher.ResultChan()
if !ok {
break
}
object := event.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
if a, e := object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a {
t.Errorf("Unexpected object watched: %s, expected %s", a, e)
}
watched++
}
func TestDelayedWatchDelivery(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestDelayedWatchDelivery(ctx, t, cacher)
}
func TestCacherListerWatcher(t *testing.T) {