Reuse generic watch tests for watchcache

Kubernetes-commit: 9fbc6f7136358331530cd221af9c93f888228e3c
This commit is contained in:
Wojciech Tyczyński 2022-11-10 15:01:41 +01:00 committed by Kubernetes Publisher
parent 4bbfd23c76
commit edd9f95ef3
3 changed files with 69 additions and 23 deletions

View File

@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"path" "path"
"reflect" "reflect"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@ -163,7 +164,11 @@ func testCheckResultFunc(t *testing.T, expectEventType watch.EventType, w watch.
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type) t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
return return
} }
if err := check(res.Object); err != nil { obj := res.Object
if co, ok := obj.(runtime.CacheableObject); ok {
obj = co.GetObject()
}
if err := check(obj); err != nil {
t.Error(err) t.Error(err)
} }
case <-time.After(wait.ForeverTestTimeout): case <-time.After(wait.ForeverTestTimeout):
@ -210,6 +215,36 @@ func resourceVersionNotOlderThan(sentinel string) func(string) error {
} }
} }
// StorageInjectingListErrors injects a dummy error for first N GetList calls.
type StorageInjectingListErrors struct {
storage.Interface
lock sync.Mutex
Errors int
}
func (s *StorageInjectingListErrors) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
err := func() error {
s.lock.Lock()
defer s.lock.Unlock()
if s.Errors > 0 {
s.Errors--
return fmt.Errorf("injected error")
}
return nil
}()
if err != nil {
return err
}
return s.Interface.GetList(ctx, key, opts, listObj)
}
func (s *StorageInjectingListErrors) ErrorsConsumed() (bool, error) {
s.lock.Lock()
defer s.lock.Unlock()
return s.Errors == 0, nil
}
// PrefixTransformer adds and verifies that all data has the correct prefix on its way in and out. // PrefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
type PrefixTransformer struct { type PrefixTransformer struct {
prefix []byte prefix []byte

View File

@ -70,10 +70,10 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur
watchTests: []*testWatchStruct{{basePod, false, ""}, {basePodAssigned, true, watch.Added}}, watchTests: []*testWatchStruct{{basePod, false, ""}, {basePodAssigned, true, watch.Added}},
pred: storage.SelectionPredicate{ pred: storage.SelectionPredicate{
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("spec.nodename=bar"), Field: fields.ParseSelectorOrDie("spec.nodeName=bar"),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*example.Pod) pod := obj.(*example.Pod)
return nil, fields.Set{"spec.nodename": pod.Spec.NodeName}, nil return nil, fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil
}, },
}, },
}, { }, {
@ -87,22 +87,28 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur
watchTests: []*testWatchStruct{{basePod, true, watch.Added}, {basePodAssigned, true, watch.Deleted}}, watchTests: []*testWatchStruct{{basePod, true, watch.Added}, {basePodAssigned, true, watch.Deleted}},
pred: storage.SelectionPredicate{ pred: storage.SelectionPredicate{
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("spec.nodename!=bar"), Field: fields.ParseSelectorOrDie("spec.nodeName!=bar"),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*example.Pod) pod := obj.(*example.Pod)
return nil, fields.Set{"spec.nodename": pod.Spec.NodeName}, nil return nil, fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil
}, },
}, },
}} }}
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
watchKey := fmt.Sprintf("pods/%s", tt.namespace) watchKey := fmt.Sprintf("/pods/%s", tt.namespace)
key := watchKey + "/foo" key := watchKey + "/foo"
if !recursive { if !recursive {
watchKey = key watchKey = key
} }
w, err := store.Watch(ctx, watchKey, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred, Recursive: recursive}) // Get the current RV from which we can start watching.
out := &example.PodList{}
if err := store.GetList(ctx, watchKey, storage.ListOptions{ResourceVersion: "", Predicate: tt.pred, Recursive: recursive}, out); err != nil {
t.Fatalf("List failed: %v", err)
}
w, err := store.Watch(ctx, watchKey, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: tt.pred, Recursive: recursive})
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }

View File

@ -201,23 +201,16 @@ func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType
} }
} }
type injectListError struct {
errors int
storage.Interface
}
func (self *injectListError) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
if self.errors > 0 {
self.errors--
return fmt.Errorf("injected error")
}
return self.Interface.GetList(ctx, key, opts, listObj)
}
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatch(ctx, t, cacher)
}
// TODO(wojtek-t): We should extend the generic RunTestWatch test to cover the
// scenarios that are not yet covered by it and get rid of this test.
func TestWatchDeprecated(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
// Inject one list error to make sure we test the relist case.
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
defer server.Terminate(t) defer server.Terminate(t)
fakeClock := testingclock.NewFakeClock(time.Now()) fakeClock := testingclock.NewFakeClock(time.Now())
cacher, _, err := newTestCacherWithClock(etcdStorage, fakeClock) cacher, _, err := newTestCacherWithClock(etcdStorage, fakeClock)
@ -787,8 +780,14 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstora
} }
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
// Inject one list error to make sure we test the relist case.
wrappedStorage := &storagetesting.StorageInjectingListErrors{
Interface: etcdStorage,
Errors: 1,
}
config := cacherstorage.Config{ config := cacherstorage.Config{
Storage: etcdStorage, Storage: wrappedStorage,
Versioner: storage.APIObjectVersioner{}, Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"}, GroupResource: schema.GroupResource{Resource: "pods"},
ResourcePrefix: setupOpts.resourcePrefix, ResourcePrefix: setupOpts.resourcePrefix,
@ -809,5 +808,11 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstora
server.Terminate(t) server.Terminate(t)
} }
// Since some tests depend on the fact that GetList shouldn't fail,
// we wait until the error from the underlying storage is consumed.
if err := wait.PollInfinite(100*time.Millisecond, wrappedStorage.ErrorsConsumed); err != nil {
t.Fatalf("Failed to inject list errors: %v", err)
}
return ctx, cacher, terminate return ctx, cacher, terminate
} }