/* Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package cacher import ( "context" "fmt" "reflect" "strconv" "sync" "testing" "time" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/diff" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" ) // verifies the cacheWatcher.process goroutine is properly cleaned up even if // the writes to cacheWatcher.result channel is blocked. func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { var lock sync.RWMutex var w *cacheWatcher count := 0 filter := func(string, labels.Set, fields.Set) bool { return true } forget := func() { lock.Lock() defer lock.Unlock() count++ // forget() has to stop the watcher, as only stopping the watcher // triggers stopping the process() goroutine which we are in the // end waiting for in this test. w.stop() } initEvents := []*watchCacheEvent{ {Object: &v1.Pod{}}, {Object: &v1.Pod{}}, } // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. w = newCacheWatcher(0, filter, forget, testVersioner{}) go w.process(context.Background(), initEvents, 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { lock.RLock() defer lock.RUnlock() return count == 2, nil }); err != nil { t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err) } } func TestCacheWatcherHandlesFiltering(t *testing.T) { filter := func(_ string, _ labels.Set, field fields.Set) bool { return field["spec.nodeName"] == "host" } forget := func() {} testCases := []struct { events []*watchCacheEvent expected []watch.Event }{ // properly handle starting with the filter, then being deleted, then re-added { events: []*watchCacheEvent{ { Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, ObjFields: fields.Set{"spec.nodeName": "host"}, ResourceVersion: 1, }, { Type: watch.Modified, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, PrevObjFields: fields.Set{"spec.nodeName": "host"}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, ObjFields: fields.Set{"spec.nodeName": ""}, ResourceVersion: 2, }, { Type: watch.Modified, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, PrevObjFields: fields.Set{"spec.nodeName": ""}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, ObjFields: fields.Set{"spec.nodeName": "host"}, ResourceVersion: 3, }, }, expected: []watch.Event{ {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}}, {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, }, }, // properly handle ignoring changes prior to the filter, then getting added, then deleted { events: []*watchCacheEvent{ { Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, ObjFields: fields.Set{"spec.nodeName": ""}, ResourceVersion: 1, }, { Type: watch.Modified, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, PrevObjFields: fields.Set{"spec.nodeName": ""}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, ObjFields: fields.Set{"spec.nodeName": ""}, ResourceVersion: 2, }, { Type: watch.Modified, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, PrevObjFields: fields.Set{"spec.nodeName": ""}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, ObjFields: fields.Set{"spec.nodeName": "host"}, ResourceVersion: 3, }, { Type: watch.Modified, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, PrevObjFields: fields.Set{"spec.nodeName": "host"}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, ObjFields: fields.Set{"spec.nodeName": "host"}, ResourceVersion: 4, }, { Type: watch.Modified, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, PrevObjFields: fields.Set{"spec.nodeName": "host"}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, ObjFields: fields.Set{"spec.nodeName": ""}, ResourceVersion: 5, }, { Type: watch.Modified, PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, PrevObjFields: fields.Set{"spec.nodeName": ""}, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}}, ObjFields: fields.Set{"spec.nodeName": ""}, ResourceVersion: 6, }, }, expected: []watch.Event{ {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, {Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}}, {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}}, }, }, } TestCase: for i, testCase := range testCases { // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. for j := range testCase.events { testCase.events[j].ResourceVersion = uint64(j) + 1 } w := newCacheWatcher(0, filter, forget, testVersioner{}) go w.process(context.Background(), testCase.events, 0) ch := w.ResultChan() for j, event := range testCase.expected { e := <-ch if !reflect.DeepEqual(event, e) { t.Errorf("%d: unexpected event %d: %s", i, j, diff.ObjectReflectDiff(event, e)) break TestCase } } select { case obj, ok := <-ch: t.Errorf("%d: unexpected excess event: %#v %t", i, obj, ok) break TestCase default: } w.Stop() } } type testVersioner struct{} func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error { return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10)) } func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error { listAccessor, err := meta.ListAccessor(obj) if err != nil || listAccessor == nil { return err } listAccessor.SetResourceVersion(strconv.FormatUint(resourceVersion, 10)) listAccessor.SetContinue(continueValue) return nil } func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error { return fmt.Errorf("unimplemented") } func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { accessor, err := meta.Accessor(obj) if err != nil { return 0, err } version := accessor.GetResourceVersion() if len(version) == 0 { return 0, nil } return strconv.ParseUint(version, 10, 64) } func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) { return strconv.ParseUint(resourceVersion, 10, 64) } var ( scheme = runtime.NewScheme() codecs = serializer.NewCodecFactory(scheme) errDummy = fmt.Errorf("dummy error") ) func init() { metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) utilruntime.Must(example.AddToScheme(scheme)) utilruntime.Must(examplev1.AddToScheme(scheme)) } func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) { prefix := "pods" config := Config{ CacheCapacity: cap, Storage: s, Versioner: testVersioner{}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) { return nil, nil, nil }, NewFunc: func() runtime.Object { return &example.Pod{} }, NewListFunc: func() runtime.Object { return &example.PodList{} }, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), } return NewCacherFromConfig(config), testVersioner{} } type dummyStorage struct { err error } type dummyWatch struct { ch chan watch.Event } func (w *dummyWatch) ResultChan() <-chan watch.Event { return w.ch } func (w *dummyWatch) Stop() { close(w.ch) } func newDummyWatch() watch.Interface { return &dummyWatch{ ch: make(chan watch.Event), } } func (d *dummyStorage) Versioner() storage.Versioner { return nil } func (d *dummyStorage) Create(_ context.Context, _ string, _, _ runtime.Object, _ uint64) error { return fmt.Errorf("unimplemented") } func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions) error { return fmt.Errorf("unimplemented") } func (d *dummyStorage) Watch(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) { return newDummyWatch(), nil } func (d *dummyStorage) WatchList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) { return newDummyWatch(), nil } func (d *dummyStorage) Get(_ context.Context, _ string, _ string, _ runtime.Object, _ bool) error { return fmt.Errorf("unimplemented") } func (d *dummyStorage) GetToList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, _ runtime.Object) error { return d.err } func (d *dummyStorage) List(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, listObj runtime.Object) error { podList := listObj.(*example.PodList) podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"} return d.err } func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.Object, _ bool, _ *storage.Preconditions, _ storage.UpdateFunc, _ ...runtime.Object) error { return fmt.Errorf("unimplemented") } func (d *dummyStorage) Count(_ string) (int64, error) { return 0, fmt.Errorf("unimplemented") } func TestListWithLimitAndRV0(t *testing.T) { backingStorage := &dummyStorage{} cacher, _ := newTestCacher(backingStorage, 0) defer cacher.Stop() pred := storage.SelectionPredicate{ Limit: 500, } result := &example.PodList{} // Wait until cacher is initialized. cacher.ready.wait() // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.err = errDummy err := cacher.List(context.TODO(), "pods/ns", "0", pred, result) if err != nil { t.Errorf("List with Limit and RV=0 should be served from cache: %v", err) } err = cacher.List(context.TODO(), "pods/ns", "", pred, result) if err != errDummy { t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err) } } func TestGetToListWithLimitAndRV0(t *testing.T) { backingStorage := &dummyStorage{} cacher, _ := newTestCacher(backingStorage, 0) defer cacher.Stop() pred := storage.SelectionPredicate{ Limit: 500, } result := &example.PodList{} // Wait until cacher is initialized. cacher.ready.wait() // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.err = errDummy err := cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result) if err != nil { t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err) } err = cacher.GetToList(context.TODO(), "pods/ns", "", pred, result) if err != errDummy { t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err) } } func TestWatcherNotGoingBackInTime(t *testing.T) { backingStorage := &dummyStorage{} cacher, _ := newTestCacher(backingStorage, 1000) defer cacher.Stop() // Wait until cacher is initialized. cacher.ready.wait() // Ensure there is some budget for slowing down processing. cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond) makePod := func(i int) *examplev1.Pod { return &examplev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("pod-%d", 1000+i), Namespace: "ns", ResourceVersion: fmt.Sprintf("%d", 1000+i), }, } } if err := cacher.watchCache.Add(makePod(0)); err != nil { t.Errorf("error: %v", err) } totalPods := 100 // Create watcher that will be slowing down reading. w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything) if err != nil { t.Fatalf("Failed to create watch: %v", err) } defer w1.Stop() go func() { a := 0 for range w1.ResultChan() { time.Sleep(time.Millisecond) a++ if a == 100 { break } } }() // Now push a ton of object to cache. for i := 1; i < totalPods; i++ { cacher.watchCache.Add(makePod(i)) } // Create fast watcher and ensure it will get each object exactly once. w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything) if err != nil { t.Fatalf("Failed to create watch: %v", err) } defer w2.Stop() shouldContinue := true currentRV := uint64(0) for shouldContinue { select { case event, ok := <-w2.ResultChan(): if !ok { shouldContinue = false break } rv, err := testVersioner{}.ParseResourceVersion(event.Object.(*examplev1.Pod).ResourceVersion) if err != nil { t.Errorf("unexpected parsing error: %v", err) } else { if rv < currentRV { t.Errorf("watcher going back in time") } currentRV = rv } case <-time.After(time.Second): w2.Stop() } } } func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { var w *cacheWatcher done := make(chan struct{}) filter := func(string, labels.Set, fields.Set) bool { return true } forget := func() { w.stop() done <- struct{}{} } maxRetriesToProduceTheRaceCondition := 1000 // Simulating the timer is fired and stopped concurrently by set time // timeout to zero and run the Stop goroutine concurrently. // May sure that the watch will not be blocked on Stop. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { w = newCacheWatcher(0, filter, forget, testVersioner{}) go w.Stop() select { case <-done: case <-time.After(time.Second): t.Fatal("stop is blocked when the timer is fired concurrently") } } // After that, verifies the cacheWatcher.process goroutine works correctly. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}) w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} ctx, _ := context.WithTimeout(context.Background(), time.Hour) go w.process(ctx, nil, 0) select { case <-w.ResultChan(): case <-time.After(time.Second): t.Fatal("expected received a event on ResultChan") } w.Stop() } }