/* Copyright 2014 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package cacher import ( "context" "fmt" "strconv" "strings" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/cacher/metrics" "k8s.io/apiserver/pkg/storage/cacher/progress" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" featuregatetesting "k8s.io/component-base/featuregate/testing" k8smetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/testutil" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" ) func makeTestPod(name string, resourceVersion uint64) *v1.Pod { return makeTestPodDetails(name, resourceVersion, "some-node", map[string]string{"k8s-app": "my-app"}) } func makeTestPodDetails(name string, resourceVersion uint64, nodeName string, labels map[string]string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: "ns", Name: name, ResourceVersion: strconv.FormatUint(resourceVersion, 10), Labels: labels, }, Spec: v1.PodSpec{ NodeName: nodeName, }, } } func makeTestStoreElement(pod *v1.Pod) *storeElement { return &storeElement{ Key: "prefix/ns/" + pod.Name, Object: pod, Labels: labels.Set(pod.Labels), Fields: fields.Set{"spec.nodeName": pod.Spec.NodeName}, } } type testWatchCache struct { *watchCache bookmarkRevision chan int64 stopCh chan struct{} } func (w *testWatchCache) getAllEventsSince(resourceVersion uint64, opts storage.ListOptions) ([]*watchCacheEvent, error) { cacheInterval, err := w.getCacheIntervalForEvents(resourceVersion, opts) if err != nil { return nil, err } result := []*watchCacheEvent{} for { event, err := cacheInterval.Next() if err != nil { return nil, err } if event == nil { break } result = append(result, event) } return result, nil } func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64, opts storage.ListOptions) (*watchCacheInterval, error) { w.RLock() defer w.RUnlock() return w.getAllEventsSinceLocked(resourceVersion, "", opts) } // newTestWatchCache just adds a fake clock. func newTestWatchCache(capacity int, eventFreshDuration time.Duration, indexers *cache.Indexers) *testWatchCache { keyFunc := func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc("prefix", obj) } getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) { pod, ok := obj.(*v1.Pod) if !ok { return nil, nil, fmt.Errorf("not a pod") } return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil } versioner := storage.APIObjectVersioner{} mockHandler := func(*watchCacheEvent) {} wc := &testWatchCache{} wc.bookmarkRevision = make(chan int64, 1) wc.stopCh = make(chan struct{}) pr := progress.NewConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil) go pr.Run(wc.stopCh) getCurrentRV := func(context.Context) (uint64, error) { return wc.resourceVersion, nil } wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), eventFreshDuration, schema.GroupResource{Resource: "pods"}, pr, getCurrentRV) // To preserve behavior of tests that assume a given capacity, // resize it to th expected size. wc.capacity = capacity wc.cache = make([]*watchCacheEvent, capacity) wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity) wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity) return wc } type immediateTickerFactory struct{} func (t *immediateTickerFactory) NewTimer(d time.Duration) clock.Timer { timer := immediateTicker{ c: make(chan time.Time), } timer.Reset(d) return &timer } type immediateTicker struct { c chan time.Time } func (t *immediateTicker) Reset(d time.Duration) (active bool) { select { case <-t.c: active = true default: } go func() { t.c <- time.Now() }() return active } func (t *immediateTicker) C() <-chan time.Time { return t.c } func (t *immediateTicker) Stop() bool { select { case <-t.c: return true default: return false } } func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error { go func() { select { case rev := <-w.bookmarkRevision: w.UpdateResourceVersion(fmt.Sprintf("%d", rev)) case <-ctx.Done(): return } }() return nil } func (w *testWatchCache) Stop() { close(w.stopCh) } func TestWatchCacheBasic(t *testing.T) { store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // Test Add/Update/Delete. pod1 := makeTestPod("pod", 1) if err := store.Add(pod1); err != nil { t.Errorf("unexpected error: %v", err) } if item, ok, _ := store.Get(pod1); !ok { t.Errorf("didn't find pod") } else { expected := makeTestStoreElement(makeTestPod("pod", 1)) if !apiequality.Semantic.DeepEqual(expected, item) { t.Errorf("expected %v, got %v", expected, item) } } pod2 := makeTestPod("pod", 2) if err := store.Update(pod2); err != nil { t.Errorf("unexpected error: %v", err) } if item, ok, _ := store.Get(pod2); !ok { t.Errorf("didn't find pod") } else { expected := makeTestStoreElement(makeTestPod("pod", 2)) if !apiequality.Semantic.DeepEqual(expected, item) { t.Errorf("expected %v, got %v", expected, item) } } pod3 := makeTestPod("pod", 3) if err := store.Delete(pod3); err != nil { t.Errorf("unexpected error: %v", err) } if _, ok, _ := store.Get(pod3); ok { t.Errorf("found pod") } // Test List. store.Add(makeTestPod("pod1", 4)) store.Add(makeTestPod("pod2", 5)) store.Add(makeTestPod("pod3", 6)) { expected := map[string]storeElement{ "prefix/ns/pod1": *makeTestStoreElement(makeTestPod("pod1", 4)), "prefix/ns/pod2": *makeTestStoreElement(makeTestPod("pod2", 5)), "prefix/ns/pod3": *makeTestStoreElement(makeTestPod("pod3", 6)), } items := make(map[string]storeElement) for _, item := range store.List() { elem := item.(*storeElement) items[elem.Key] = *elem } if !apiequality.Semantic.DeepEqual(expected, items) { t.Errorf("expected %v, got %v", expected, items) } } // Test Replace. store.Replace([]interface{}{ makeTestPod("pod4", 7), makeTestPod("pod5", 8), }, "8") { expected := map[string]storeElement{ "prefix/ns/pod4": *makeTestStoreElement(makeTestPod("pod4", 7)), "prefix/ns/pod5": *makeTestStoreElement(makeTestPod("pod5", 8)), } items := make(map[string]storeElement) for _, item := range store.List() { elem := item.(*storeElement) items[elem.Key] = *elem } if !apiequality.Semantic.DeepEqual(expected, items) { t.Errorf("expected %v, got %v", expected, items) } } } func TestEvents(t *testing.T) { store := newTestWatchCache(5, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // no dynamic-size cache to fit old tests. store.lowerBoundCapacity = 5 store.upperBoundCapacity = 5 store.Add(makeTestPod("pod", 3)) // Test for Added event. { _, err := store.getAllEventsSince(1, storage.ListOptions{Predicate: storage.Everything}) if err == nil { t.Errorf("expected error too old") } if _, ok := err.(*errors.StatusError); !ok { t.Errorf("expected error to be of type StatusError") } } { result, err := store.getAllEventsSince(2, storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Errorf("unexpected error: %v", err) } if len(result) != 1 { t.Fatalf("unexpected events: %v", result) } if result[0].Type != watch.Added { t.Errorf("unexpected event type: %v", result[0].Type) } pod := makeTestPod("pod", uint64(3)) if !apiequality.Semantic.DeepEqual(pod, result[0].Object) { t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod) } if result[0].PrevObject != nil { t.Errorf("unexpected item: %v", result[0].PrevObject) } } store.Update(makeTestPod("pod", 4)) store.Update(makeTestPod("pod", 5)) // Test with not full cache. { _, err := store.getAllEventsSince(1, storage.ListOptions{Predicate: storage.Everything}) if err == nil { t.Errorf("expected error too old") } } { result, err := store.getAllEventsSince(3, storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Errorf("unexpected error: %v", err) } if len(result) != 2 { t.Fatalf("unexpected events: %v", result) } for i := 0; i < 2; i++ { if result[i].Type != watch.Modified { t.Errorf("unexpected event type: %v", result[i].Type) } pod := makeTestPod("pod", uint64(i+4)) if !apiequality.Semantic.DeepEqual(pod, result[i].Object) { t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) } prevPod := makeTestPod("pod", uint64(i+3)) if !apiequality.Semantic.DeepEqual(prevPod, result[i].PrevObject) { t.Errorf("unexpected item: %v, expected: %v", result[i].PrevObject, prevPod) } } } for i := 6; i < 10; i++ { store.Update(makeTestPod("pod", uint64(i))) } // Test with full cache - there should be elements from 5 to 9. { _, err := store.getAllEventsSince(3, storage.ListOptions{Predicate: storage.Everything}) if err == nil { t.Errorf("expected error too old") } } { result, err := store.getAllEventsSince(4, storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Errorf("unexpected error: %v", err) } if len(result) != 5 { t.Fatalf("unexpected events: %v", result) } for i := 0; i < 5; i++ { pod := makeTestPod("pod", uint64(i+5)) if !apiequality.Semantic.DeepEqual(pod, result[i].Object) { t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) } } } // Test for delete event. store.Delete(makeTestPod("pod", uint64(10))) { result, err := store.getAllEventsSince(9, storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Errorf("unexpected error: %v", err) } if len(result) != 1 { t.Fatalf("unexpected events: %v", result) } if result[0].Type != watch.Deleted { t.Errorf("unexpected event type: %v", result[0].Type) } pod := makeTestPod("pod", uint64(10)) if !apiequality.Semantic.DeepEqual(pod, result[0].Object) { t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod) } prevPod := makeTestPod("pod", uint64(9)) if !apiequality.Semantic.DeepEqual(prevPod, result[0].PrevObject) { t.Errorf("unexpected item: %v, expected: %v", result[0].PrevObject, prevPod) } } } func TestMarker(t *testing.T) { store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // First thing that is called when propagated from storage is Replace. store.Replace([]interface{}{ makeTestPod("pod1", 5), makeTestPod("pod2", 9), }, "9") _, err := store.getAllEventsSince(8, storage.ListOptions{Predicate: storage.Everything}) if err == nil || !strings.Contains(err.Error(), "too old resource version") { t.Errorf("unexpected error: %v", err) } // Getting events from 8 should return no events, // even though there is a marker there. result, err := store.getAllEventsSince(9, storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Fatalf("unexpected error: %v", err) } if len(result) != 0 { t.Errorf("unexpected result: %#v, expected no events", result) } pod := makeTestPod("pods", 12) store.Add(pod) // Getting events from 8 should still work and return one event. result, err = store.getAllEventsSince(9, storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Fatalf("unexpected error: %v", err) } if len(result) != 1 || !apiequality.Semantic.DeepEqual(result[0].Object, pod) { t.Errorf("unexpected result: %#v, expected %v", result, pod) } } func TestWaitUntilFreshAndGetList(t *testing.T) { ctx := context.Background() store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{ "l:label": func(obj interface{}) ([]string, error) { pod, ok := obj.(*v1.Pod) if !ok { return nil, fmt.Errorf("not a pod %#v", obj) } if value, ok := pod.Labels["label"]; ok { return []string{value}, nil } return nil, nil }, "f:spec.nodeName": func(obj interface{}) ([]string, error) { pod, ok := obj.(*v1.Pod) if !ok { return nil, fmt.Errorf("not a pod %#v", obj) } return []string{pod.Spec.NodeName}, nil }, }) defer store.Stop() // In background, update the store. go func() { store.Add(makeTestPodDetails("pod1", 2, "node1", map[string]string{"label": "value1"})) store.Add(makeTestPodDetails("pod2", 3, "node1", map[string]string{"label": "value1"})) store.Add(makeTestPodDetails("pod3", 5, "node2", map[string]string{"label": "value2"})) }() // list by empty MatchValues. resp, indexUsed, err := store.WaitUntilFreshAndGetList(ctx, "prefix/", storage.ListOptions{ResourceVersion: "5", Recursive: true, Predicate: storage.Everything}) if err != nil { t.Fatalf("unexpected error: %v", err) } if resp.ResourceVersion != 5 { t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion) } if len(resp.Items) != 3 { t.Errorf("unexpected list returned: %#v", resp) } if indexUsed != "" { t.Errorf("Used index %q but expected none to be used", indexUsed) } // list by label index. resp, indexUsed, err = store.WaitUntilFreshAndGetList(ctx, "prefix/", storage.ListOptions{ResourceVersion: "5", Recursive: true, Predicate: storage.SelectionPredicate{ Label: labels.SelectorFromSet(map[string]string{ "label": "value1", }), Field: fields.SelectorFromSet(map[string]string{ "spec.nodeName": "node2", }), IndexLabels: []string{"label"}, }}) if err != nil { t.Fatalf("unexpected error: %v", err) } if resp.ResourceVersion != 5 { t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion) } if len(resp.Items) != 2 { t.Errorf("unexpected list returned: %#v", resp) } if indexUsed != "l:label" { t.Errorf("Used index %q but expected %q", indexUsed, "l:label") } // list with spec.nodeName index. resp, indexUsed, err = store.WaitUntilFreshAndGetList(ctx, "prefix/", storage.ListOptions{ResourceVersion: "5", Recursive: true, Predicate: storage.SelectionPredicate{ Label: labels.SelectorFromSet(map[string]string{ "not-exist-label": "whatever", }), Field: fields.SelectorFromSet(map[string]string{ "spec.nodeName": "node2", }), IndexFields: []string{"spec.nodeName"}, }}) if err != nil { t.Fatalf("unexpected error: %v", err) } if resp.ResourceVersion != 5 { t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion) } if len(resp.Items) != 1 { t.Errorf("unexpected list returned: %#v", resp) } if indexUsed != "f:spec.nodeName" { t.Errorf("Used index %q but expected %q", indexUsed, "f:spec.nodeName") } // list with index not exists. resp, indexUsed, err = store.WaitUntilFreshAndGetList(ctx, "prefix/", storage.ListOptions{ResourceVersion: "5", Recursive: true, Predicate: storage.SelectionPredicate{ Label: labels.SelectorFromSet(map[string]string{ "not-exist-label": "whatever", }), Field: fields.Everything(), IndexLabels: []string{"label"}, }}) if err != nil { t.Fatalf("unexpected error: %v", err) } if resp.ResourceVersion != 5 { t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion) } if len(resp.Items) != 3 { t.Errorf("unexpected list returned: %#v", resp) } if indexUsed != "" { t.Errorf("Used index %q but expected none to be used", indexUsed) } } func TestWaitUntilFreshAndListFromCache(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true) forceRequestWatchProgressSupport(t) ctx := context.Background() store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // In background, update the store. go func() { store.Add(makeTestPod("pod1", 2)) store.bookmarkRevision <- 3 }() // list from future revision. Requires watch cache to request bookmark to get it. resp, indexUsed, err := store.WaitUntilFreshAndGetList(ctx, "prefix/", storage.ListOptions{ResourceVersion: "3", Recursive: true, Predicate: storage.Everything}) if err != nil { t.Fatalf("unexpected error: %v", err) } if resp.ResourceVersion != 3 { t.Errorf("unexpected resourceVersion: %v, expected: 6", resp.ResourceVersion) } if len(resp.Items) != 1 { t.Errorf("unexpected list returned: %#v", resp) } if indexUsed != "" { t.Errorf("Used index %q but expected none to be used", indexUsed) } } func TestWaitUntilFreshAndGet(t *testing.T) { ctx := context.Background() store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // In background, update the store. go func() { store.Add(makeTestPod("foo", 2)) store.Add(makeTestPod("bar", 5)) }() obj, exists, resourceVersion, err := store.WaitUntilFreshAndGet(ctx, 5, "prefix/ns/bar") if err != nil { t.Fatalf("unexpected error: %v", err) } if resourceVersion != 5 { t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) } if !exists { t.Fatalf("no results returned: %#v", obj) } expected := makeTestStoreElement(makeTestPod("bar", 5)) if !apiequality.Semantic.DeepEqual(expected, obj) { t.Errorf("expected %v, got %v", expected, obj) } } func TestWaitUntilFreshAndListTimeout(t *testing.T) { tcs := []struct { name string ConsistentListFromCache bool }{ { name: "FromStorage", ConsistentListFromCache: false, }, { name: "FromCache", ConsistentListFromCache: true, }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.ConsistentListFromCache) ctx := context.Background() store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() fc := store.clock.(*testingclock.FakeClock) // In background, step clock after the below call starts the timer. go func() { for !fc.HasWaiters() { time.Sleep(time.Millisecond) } store.Add(makeTestPod("foo", 2)) store.bookmarkRevision <- 3 fc.Step(blockTimeout) // Add an object to make sure the test would // eventually fail instead of just waiting // forever. time.Sleep(30 * time.Second) store.Add(makeTestPod("bar", 4)) }() _, _, err := store.WaitUntilFreshAndGetList(ctx, "", storage.ListOptions{ResourceVersion: "4", Predicate: storage.Everything}) if !errors.IsTimeout(err) { t.Errorf("expected timeout error but got: %v", err) } if !storage.IsTooLargeResourceVersion(err) { t.Errorf("expected 'Too large resource version' cause in error but got: %v", err) } }) } } type testLW struct { ListFunc func(options metav1.ListOptions) (runtime.Object, error) WatchFunc func(options metav1.ListOptions) (watch.Interface, error) } func (t *testLW) List(options metav1.ListOptions) (runtime.Object, error) { return t.ListFunc(options) } func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) { return t.WatchFunc(options) } func TestReflectorForWatchCache(t *testing.T) { ctx := context.Background() store := newTestWatchCache(5, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() { resp, _, err := store.WaitUntilFreshAndGetList(ctx, "", storage.ListOptions{ResourceVersion: "", Recursive: true, Predicate: storage.Everything}) if err != nil { t.Fatalf("unexpected error: %v", err) } if resp.ResourceVersion != 0 { t.Errorf("unexpected resource version: %d", resp.ResourceVersion) } } lw := &testLW{ WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) { fw := watch.NewFake() go fw.Stop() return fw, nil }, ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil }, } r := cache.NewReflector(lw, &v1.Pod{}, store, 0) r.ListAndWatch(wait.NeverStop) { resp, _, err := store.WaitUntilFreshAndGetList(ctx, "", storage.ListOptions{ResourceVersion: "10", Recursive: true, Predicate: storage.Everything}) if err != nil { t.Fatalf("unexpected error: %v", err) } if resp.ResourceVersion != 10 { t.Errorf("unexpected resource version: %d", resp.ResourceVersion) } } } func TestDynamicCache(t *testing.T) { tests := []struct { name string eventCount int cacheCapacity int startIndex int // interval is time duration between adjacent events. lowerBoundCapacity int upperBoundCapacity int interval time.Duration expectCapacity int expectStartIndex int }{ { name: "[capacity not equals 4*n] events inside DefaultEventFreshDuration cause cache expanding", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, interval: DefaultEventFreshDuration / 6, expectCapacity: 10, expectStartIndex: 0, }, { name: "[capacity not equals 4*n] events outside DefaultEventFreshDuration without change cache capacity", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, interval: DefaultEventFreshDuration / 4, expectCapacity: 5, expectStartIndex: 0, }, { name: "[capacity not equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, interval: DefaultEventFreshDuration + time.Second, expectCapacity: 2, expectStartIndex: 3, }, { name: "[capacity not equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 3, upperBoundCapacity: 5 * 2, interval: DefaultEventFreshDuration + time.Second, expectCapacity: 3, expectStartIndex: 2, }, { name: "[capacity not equals 4*n] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 8, interval: DefaultEventFreshDuration / 6, expectCapacity: 8, expectStartIndex: 0, }, { name: "[capacity not equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, interval: DefaultEventFreshDuration / 6, expectCapacity: 10, expectStartIndex: 3, }, { name: "[capacity not equals 4*n] [startIndex not equal 0] events outside DefaultEventFreshDuration without change cache capacity", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, interval: DefaultEventFreshDuration / 4, expectCapacity: 5, expectStartIndex: 3, }, { name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, interval: DefaultEventFreshDuration + time.Second, expectCapacity: 2, expectStartIndex: 6, }, { name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 3, upperBoundCapacity: 5 * 2, interval: DefaultEventFreshDuration + time.Second, expectCapacity: 3, expectStartIndex: 5, }, { name: "[capacity not equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 8, interval: DefaultEventFreshDuration / 6, expectCapacity: 8, expectStartIndex: 3, }, { name: "[capacity equals 4*n] events inside DefaultEventFreshDuration cause cache expanding", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, interval: DefaultEventFreshDuration / 9, expectCapacity: 16, expectStartIndex: 0, }, { name: "[capacity equals 4*n] events outside DefaultEventFreshDuration without change cache capacity", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, interval: DefaultEventFreshDuration / 8, expectCapacity: 8, expectStartIndex: 0, }, { name: "[capacity equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, interval: DefaultEventFreshDuration/2 + time.Second, expectCapacity: 4, expectStartIndex: 4, }, { name: "[capacity equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 7, upperBoundCapacity: 8 * 2, interval: DefaultEventFreshDuration/2 + time.Second, expectCapacity: 7, expectStartIndex: 1, }, { name: "[capacity equals 4*n] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 10, interval: DefaultEventFreshDuration / 9, expectCapacity: 10, expectStartIndex: 0, }, { name: "[capacity equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, interval: DefaultEventFreshDuration / 9, expectCapacity: 16, expectStartIndex: 3, }, { name: "[capacity equals 4*n] [startIndex not equal 0] events outside DefaultEventFreshDuration without change cache capacity", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, interval: DefaultEventFreshDuration / 8, expectCapacity: 8, expectStartIndex: 3, }, { name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, interval: DefaultEventFreshDuration/2 + time.Second, expectCapacity: 4, expectStartIndex: 7, }, { name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 7, upperBoundCapacity: 8 * 2, interval: DefaultEventFreshDuration/2 + time.Second, expectCapacity: 7, expectStartIndex: 4, }, { name: "[capacity equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 10, interval: DefaultEventFreshDuration / 9, expectCapacity: 10, expectStartIndex: 3, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { store := newTestWatchCache(test.cacheCapacity, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() store.cache = make([]*watchCacheEvent, test.cacheCapacity) store.startIndex = test.startIndex store.lowerBoundCapacity = test.lowerBoundCapacity store.upperBoundCapacity = test.upperBoundCapacity loadEventWithDuration(store, test.eventCount, test.interval) nextInterval := store.clock.Now().Add(time.Duration(test.interval.Nanoseconds() * int64(test.eventCount))) store.resizeCacheLocked(nextInterval) if store.capacity != test.expectCapacity { t.Errorf("expect capacity %d, but get %d", test.expectCapacity, store.capacity) } // check cache's startIndex, endIndex and all elements. if store.startIndex != test.expectStartIndex { t.Errorf("expect startIndex %d, but get %d", test.expectStartIndex, store.startIndex) } if store.endIndex != test.startIndex+test.eventCount { t.Errorf("expect endIndex %d get %d", test.startIndex+test.eventCount, store.endIndex) } if !checkCacheElements(store) { t.Errorf("some elements locations in cache is wrong") } }) } } func loadEventWithDuration(cache *testWatchCache, count int, interval time.Duration) { for i := 0; i < count; i++ { event := &watchCacheEvent{ Key: fmt.Sprintf("event-%d", i+cache.startIndex), RecordTime: cache.clock.Now().Add(time.Duration(interval.Nanoseconds() * int64(i))), } cache.cache[(i+cache.startIndex)%cache.capacity] = event } cache.endIndex = cache.startIndex + count } func checkCacheElements(cache *testWatchCache) bool { for i := cache.startIndex; i < cache.endIndex; i++ { location := i % cache.capacity if cache.cache[location].Key != fmt.Sprintf("event-%d", i) { return false } } return true } func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() now := store.clock.Now() addEvent := func(key string, rv uint64, t time.Time) { event := &watchCacheEvent{ Key: key, ResourceVersion: rv, RecordTime: t, } store.updateCache(event) } // Initial LIST comes from the moment of RV=10. store.Replace(nil, "10") addEvent("key1", 20, now) // Force "key1" to rotate our of cache. later := now.Add(2 * DefaultEventFreshDuration) addEvent("key2", 30, later) addEvent("key3", 40, later) // Force cache resize. addEvent("key4", 50, later.Add(time.Second)) _, err := store.getAllEventsSince(15, storage.ListOptions{Predicate: storage.Everything}) if err == nil || !strings.Contains(err.Error(), "too old resource version") { t.Errorf("unexpected error: %v", err) } } func TestSuggestedWatchChannelSize(t *testing.T) { testCases := []struct { name string capacity int indexExists bool triggerUsed bool eventsFreshDuration time.Duration expected int }{ { name: "capacity=100, indexExists, triggerUsed", capacity: 100, indexExists: true, triggerUsed: true, eventsFreshDuration: DefaultEventFreshDuration, expected: 10, }, { name: "capacity=100, indexExists, !triggerUsed", capacity: 100, indexExists: true, triggerUsed: false, eventsFreshDuration: DefaultEventFreshDuration, expected: 10, }, { name: "capacity=100, !indexExists", capacity: 100, indexExists: false, triggerUsed: false, eventsFreshDuration: DefaultEventFreshDuration, expected: 10, }, { name: "capacity=750, indexExists, triggerUsed", capacity: 750, indexExists: true, triggerUsed: true, eventsFreshDuration: DefaultEventFreshDuration, expected: 10, }, { name: "capacity=750, indexExists, !triggerUsed", capacity: 750, indexExists: true, triggerUsed: false, eventsFreshDuration: DefaultEventFreshDuration, expected: 10, }, { name: "capacity=750, !indexExists", capacity: 750, indexExists: false, triggerUsed: false, eventsFreshDuration: DefaultEventFreshDuration, expected: 10, }, { name: "capacity=7500, indexExists, triggerUsed", capacity: 7500, indexExists: true, triggerUsed: true, eventsFreshDuration: DefaultEventFreshDuration, expected: 10, }, { name: "capacity=7500, indexExists, !triggerUsed", capacity: 7500, indexExists: true, triggerUsed: false, eventsFreshDuration: DefaultEventFreshDuration, expected: 100, }, { name: "capacity=7500, indexExists, !triggerUsed, eventsFreshDuration=2m30s", capacity: 7500, indexExists: true, triggerUsed: false, eventsFreshDuration: 2 * DefaultEventFreshDuration, expected: 50, }, { name: "capacity=7500, !indexExists", capacity: 7500, indexExists: false, triggerUsed: false, eventsFreshDuration: DefaultEventFreshDuration, expected: 100, }, { name: "capacity=7500, !indexExists, eventsFreshDuration=2m30s", capacity: 7500, indexExists: false, triggerUsed: false, eventsFreshDuration: 2 * DefaultEventFreshDuration, expected: 50, }, { name: "capacity=75000, indexExists, triggerUsed", capacity: 75000, indexExists: true, triggerUsed: true, eventsFreshDuration: DefaultEventFreshDuration, expected: 10, }, { name: "capacity=75000, indexExists, !triggerUsed", capacity: 75000, indexExists: true, triggerUsed: false, eventsFreshDuration: DefaultEventFreshDuration, expected: 1000, }, { name: "capacity=75000, indexExists, !triggerUsed, eventsFreshDuration=2m30s", capacity: 75000, indexExists: true, triggerUsed: false, eventsFreshDuration: 2 * DefaultEventFreshDuration, expected: 500, }, { name: "capacity=75000, !indexExists", capacity: 75000, indexExists: false, triggerUsed: false, eventsFreshDuration: DefaultEventFreshDuration, expected: 100, }, { name: "capacity=750000, indexExists, triggerUsed", capacity: 750000, indexExists: true, triggerUsed: true, eventsFreshDuration: DefaultEventFreshDuration, expected: 10, }, { name: "capacity=750000, indexExists, !triggerUsed", capacity: 750000, indexExists: true, triggerUsed: false, eventsFreshDuration: DefaultEventFreshDuration, expected: 1000, }, { name: "capacity=750000, !indexExists", capacity: 750000, indexExists: false, triggerUsed: false, eventsFreshDuration: DefaultEventFreshDuration, expected: 100, }, } for _, test := range testCases { t.Run(test.name, func(t *testing.T) { store := newTestWatchCache(test.capacity, test.eventsFreshDuration, &cache.Indexers{}) defer store.Stop() got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed) if got != test.expected { t.Errorf("unexpected channel size got: %v, expected: %v", got, test.expected) } }) } } func TestCapacityUpperBound(t *testing.T) { testCases := []struct { name string eventFreshDuration time.Duration expected int }{ { name: "default eventFreshDuration", eventFreshDuration: DefaultEventFreshDuration, // 75s expected: defaultUpperBoundCapacity, // 100 * 1024 }, { name: "lower eventFreshDuration, capacity limit unchanged", eventFreshDuration: 45 * time.Second, // 45s expected: defaultUpperBoundCapacity, // 100 * 1024 }, { name: "higher eventFreshDuration, capacity limit scaled up", eventFreshDuration: 4 * DefaultEventFreshDuration, // 4 * 75s expected: 4 * defaultUpperBoundCapacity, // 4 * 100 * 1024 }, { name: "higher eventFreshDuration, capacity limit scaled and rounded up", eventFreshDuration: 3 * DefaultEventFreshDuration, // 3 * 75s expected: 4 * defaultUpperBoundCapacity, // 4 * 100 * 1024 }, { name: "higher eventFreshDuration, capacity limit scaled up and capped", eventFreshDuration: DefaultEventFreshDuration << 20, // 2^20 * 75s expected: defaultUpperBoundCapacity << 14, // 2^14 * 100 * 1024 }, } for _, test := range testCases { t.Run(test.name, func(t *testing.T) { capacity := capacityUpperBound(test.eventFreshDuration) if test.expected != capacity { t.Errorf("expected %v, got %v", test.expected, capacity) } }) } } func BenchmarkWatchCache_updateCache(b *testing.B) { store := newTestWatchCache(defaultUpperBoundCapacity, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() store.cache = store.cache[:0] store.upperBoundCapacity = defaultUpperBoundCapacity loadEventWithDuration(store, defaultUpperBoundCapacity, 0) add := &watchCacheEvent{ Key: fmt.Sprintf("event-%d", defaultUpperBoundCapacity), RecordTime: store.clock.Now(), } b.ResetTimer() for i := 0; i < b.N; i++ { store.updateCache(add) } } func TestHistogramCacheReadWait(t *testing.T) { registry := k8smetrics.NewKubeRegistry() if err := registry.Register(metrics.WatchCacheReadWait); err != nil { t.Errorf("unexpected error: %v", err) } ctx := context.Background() testedMetrics := "apiserver_watch_cache_read_wait_seconds" store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // In background, update the store. go func() { if err := store.Add(makeTestPod("foo", 2)); err != nil { t.Errorf("unexpected error: %v", err) } if err := store.Add(makeTestPod("bar", 5)); err != nil { t.Errorf("unexpected error: %v", err) } }() testCases := []struct { desc string resourceVersion uint64 want string }{ { desc: "resourceVersion is non-zero", resourceVersion: 5, want: ` # HELP apiserver_watch_cache_read_wait_seconds [ALPHA] Histogram of time spent waiting for a watch cache to become fresh. # TYPE apiserver_watch_cache_read_wait_seconds histogram apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.005"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.025"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.05"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.1"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.2"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.4"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.6"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.8"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="1"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="1.25"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="1.5"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="2"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="3"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="+Inf"} 1 apiserver_watch_cache_read_wait_seconds_sum{group="",resource="pods"} 0 apiserver_watch_cache_read_wait_seconds_count{group="",resource="pods"} 1 `, }, { desc: "resourceVersion is 0", resourceVersion: 0, want: ``, }, } for _, test := range testCases { t.Run(test.desc, func(t *testing.T) { defer registry.Reset() if _, _, _, err := store.WaitUntilFreshAndGet(ctx, test.resourceVersion, "prefix/ns/bar"); err != nil { t.Errorf("unexpected error: %v", err) } if err := testutil.GatherAndCompare(registry, strings.NewReader(test.want), testedMetrics); err != nil { t.Errorf("unexpected error: %v", err) } }) } } func TestCacheSnapshots(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true) store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() store.upperBoundCapacity = 3 store.lowerBoundCapacity = 1 clock := store.clock.(*testingclock.FakeClock) _, found := store.snapshots.GetLessOrEqual(100) assert.False(t, found, "Expected empty cache to not include any snapshots") t.Log("Test cache on rev 100") require.NoError(t, store.Add(makeTestPod("foo", 100))) require.NoError(t, store.Update(makeTestPod("foo", 200))) clock.Step(time.Second) require.NoError(t, store.Delete(makeTestPod("foo", 300))) t.Log("Test cache on rev 100") _, found = store.snapshots.GetLessOrEqual(99) assert.False(t, found, "Expected store to not include rev 99") lister, found := store.snapshots.GetLessOrEqual(100) assert.True(t, found, "Expected store to not include rev 100") elements := lister.ListPrefix("", "") assert.Len(t, elements, 1) assert.Equal(t, makeTestPod("foo", 100), elements[0].(*storeElement).Object) t.Log("Overflow cache to remove rev 100") require.NoError(t, store.Add(makeTestPod("foo", 400))) _, found = store.snapshots.GetLessOrEqual(100) assert.False(t, found, "Expected overfilled cache to delete oldest rev 100") t.Log("Test cache on rev 200") lister, found = store.snapshots.GetLessOrEqual(200) assert.True(t, found, "Expected store to still keep rev 200") elements = lister.ListPrefix("", "") assert.Len(t, elements, 1) assert.Equal(t, makeTestPod("foo", 200), elements[0].(*storeElement).Object) t.Log("Test cache on rev 300") lister, found = store.snapshots.GetLessOrEqual(300) assert.True(t, found, "Expected store to still keep rev 300") elements = lister.ListPrefix("", "") assert.Empty(t, elements) t.Log("Test cache on rev 400") lister, found = store.snapshots.GetLessOrEqual(400) assert.True(t, found, "Expected store to still keep rev 400") elements = lister.ListPrefix("", "") assert.Len(t, elements, 1) assert.Equal(t, makeTestPod("foo", 400), elements[0].(*storeElement).Object) t.Log("Add event outside the event fresh window to force cache capacity downsize") assert.Equal(t, 3, store.capacity) clock.Step(DefaultEventFreshDuration + 1) require.NoError(t, store.Update(makeTestPod("foo", 500))) assert.Equal(t, 1, store.capacity) assert.Equal(t, 1, store.snapshots.Len()) _, found = store.snapshots.GetLessOrEqual(499) assert.False(t, found, "Expected overfilled cache to delete events below 500") t.Log("Test cache on rev 500") lister, found = store.snapshots.GetLessOrEqual(500) assert.True(t, found, "Expected store to still keep rev 500") elements = lister.ListPrefix("", "") assert.Len(t, elements, 1) assert.Equal(t, makeTestPod("foo", 500), elements[0].(*storeElement).Object) t.Log("Add event to force capacity upsize") require.NoError(t, store.Update(makeTestPod("foo", 600))) assert.Equal(t, 2, store.capacity) assert.Equal(t, 2, store.snapshots.Len()) t.Log("Test cache on rev 600") lister, found = store.snapshots.GetLessOrEqual(600) assert.True(t, found, "Expected replace to be snapshotted") elements = lister.ListPrefix("", "") assert.Len(t, elements, 1) assert.Equal(t, makeTestPod("foo", 600), elements[0].(*storeElement).Object) t.Log("Replace cache to remove history") _, found = store.snapshots.GetLessOrEqual(500) assert.True(t, found, "Confirm that cache stores history before replace") err := store.Replace([]interface{}{makeTestPod("foo", 600)}, "700") require.NoError(t, err) _, found = store.snapshots.GetLessOrEqual(500) assert.False(t, found, "Expected replace to remove history") _, found = store.snapshots.GetLessOrEqual(600) assert.False(t, found, "Expected replace to remove history") t.Log("Test cache on rev 700") lister, found = store.snapshots.GetLessOrEqual(700) assert.True(t, found, "Expected replace to be snapshotted") elements = lister.ListPrefix("", "") assert.Len(t, elements, 1) assert.Equal(t, makeTestPod("foo", 600), elements[0].(*storeElement).Object) }