diff --git a/go.mod b/go.mod index 735fb9764..17b259f20 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( gopkg.in/square/go-jose.v2 v2.6.0 k8s.io/api v0.0.0-20241214014715-eac45518d7fe k8s.io/apimachinery v0.0.0-20241214014415-767f17a6afea - k8s.io/client-go v0.0.0-20241214015128-61ee2c5802c7 + k8s.io/client-go v0.0.0-20241215015103-67da6d1a4174 k8s.io/component-base v0.0.0-20241214020124-b7fbd0d55e44 k8s.io/klog/v2 v2.130.1 k8s.io/kms v0.0.0-20241213100418-8cb606989fcf diff --git a/go.sum b/go.sum index 0b07469d0..f801c2a9b 100644 --- a/go.sum +++ b/go.sum @@ -369,8 +369,8 @@ k8s.io/api v0.0.0-20241214014715-eac45518d7fe h1:3brWXwMKrWloyi0Qqv6SqRIGC/yS1wf k8s.io/api v0.0.0-20241214014715-eac45518d7fe/go.mod h1:TeD+e60UFfC0xfnP9/tT92lG7sSnSs+ebTPX1oCNrDU= k8s.io/apimachinery v0.0.0-20241214014415-767f17a6afea h1:ZUHj/k511rdZLx69atS9F5P+PDDQarX9DmI8/3TQ15Y= k8s.io/apimachinery v0.0.0-20241214014415-767f17a6afea/go.mod h1:vmecNW2HWfNZboIXS3Vg/3qp+T42YyW6jCpcdhnas9s= -k8s.io/client-go v0.0.0-20241214015128-61ee2c5802c7 h1:W2NxcoEN3y/P48IBQdJgBj2nWlU+dyXvRQKjRYdV6y8= -k8s.io/client-go v0.0.0-20241214015128-61ee2c5802c7/go.mod h1:9DOj9Eg/2wdCibOBBR8J+SamkzoU+TVr9bk5B7KAbgM= +k8s.io/client-go v0.0.0-20241215015103-67da6d1a4174 h1:NcD7ZRs38+ChK6qTJN5ahkVq7MWuNaZZiO1AspZLwck= +k8s.io/client-go v0.0.0-20241215015103-67da6d1a4174/go.mod h1:9DOj9Eg/2wdCibOBBR8J+SamkzoU+TVr9bk5B7KAbgM= k8s.io/component-base v0.0.0-20241214020124-b7fbd0d55e44 h1:UGwTOasY4f/2bmwNlInBJjkhZw3lWGy+gk+w5tQOAvc= k8s.io/component-base v0.0.0-20241214020124-b7fbd0d55e44/go.mod h1:u4F22/ZnzBZX8Sw10wXHts7D9Ak81nBSrbdckaDcvjE= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= diff --git a/pkg/registry/generic/registry/storage_factory.go b/pkg/registry/generic/registry/storage_factory.go index 3c974f398..63fb20bcb 100644 --- a/pkg/registry/generic/registry/storage_factory.go +++ b/pkg/registry/generic/registry/storage_factory.go @@ -54,17 +54,18 @@ func StorageWithCacher() generic.StorageDecorator { } cacherConfig := cacherstorage.Config{ - Storage: s, - Versioner: storage.APIObjectVersioner{}, - GroupResource: storageConfig.GroupResource, - ResourcePrefix: resourcePrefix, - KeyFunc: keyFunc, - NewFunc: newFunc, - NewListFunc: newListFunc, - GetAttrsFunc: getAttrsFunc, - IndexerFuncs: triggerFuncs, - Indexers: indexers, - Codec: storageConfig.Codec, + Storage: s, + Versioner: storage.APIObjectVersioner{}, + GroupResource: storageConfig.GroupResource, + EventsHistoryWindow: storageConfig.EventsHistoryWindow, + ResourcePrefix: resourcePrefix, + KeyFunc: keyFunc, + NewFunc: newFunc, + NewListFunc: newListFunc, + GetAttrsFunc: getAttrsFunc, + IndexerFuncs: triggerFuncs, + Indexers: indexers, + Codec: storageConfig.Codec, } cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig) if err != nil { diff --git a/pkg/registry/generic/registry/store_test.go b/pkg/registry/generic/registry/store_test.go index e11cf8d3d..4127342e3 100644 --- a/pkg/registry/generic/registry/store_test.go +++ b/pkg/registry/generic/registry/store_test.go @@ -2435,15 +2435,16 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE } if hasCacheEnabled { config := cacherstorage.Config{ - Storage: s, - Versioner: storage.APIObjectVersioner{}, - GroupResource: schema.GroupResource{Resource: "pods"}, - ResourcePrefix: podPrefix, - KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, - GetAttrsFunc: getPodAttrs, - NewFunc: newFunc, - NewListFunc: newListFunc, - Codec: sc.Codec, + Storage: s, + Versioner: storage.APIObjectVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, + EventsHistoryWindow: cacherstorage.DefaultEventFreshDuration, + ResourcePrefix: podPrefix, + KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, + GetAttrsFunc: getPodAttrs, + NewFunc: newFunc, + NewListFunc: newListFunc, + Codec: sc.Codec, } cacher, err := cacherstorage.NewCacherFromConfig(config) if err != nil { diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index a5b5506dc..e6f01384c 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -61,10 +61,16 @@ const ( // storageWatchListPageSize is the cacher's request chunk size of // initial and resync watch lists to storage. storageWatchListPageSize = int64(10000) + + // DefaultEventFreshDuration is the default time duration of events + // we want to keep. + // We set it to defaultBookmarkFrequency plus epsilon to maximize + // chances that last bookmark was sent within kept history, at the + // same time, minimizing the needed memory usage. + DefaultEventFreshDuration = defaultBookmarkFrequency + 15*time.Second + // defaultBookmarkFrequency defines how frequently watch bookmarks should be send // in addition to sending a bookmark right before watch deadline. - // - // NOTE: Update `eventFreshDuration` when changing this value. defaultBookmarkFrequency = time.Minute ) @@ -80,6 +86,10 @@ type Config struct { // and metrics. GroupResource schema.GroupResource + // EventsHistoryWindow specifies minimum history duration that storage is keeping. + // If lower than DefaultEventFreshDuration, the cache creation will fail. + EventsHistoryWindow time.Duration + // The Cache will be caching objects of a given Type and assumes that they // are all stored under ResourcePrefix directory in the underlying database. ResourcePrefix string @@ -409,9 +419,15 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { contextMetadata = metadata.New(map[string]string{"source": "cache"}) } + eventFreshDuration := config.EventsHistoryWindow + if eventFreshDuration < DefaultEventFreshDuration { + return nil, fmt.Errorf("config.EventsHistoryWindow (%v) must not be below %v", eventFreshDuration, DefaultEventFreshDuration) + } + progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata) watchCache := newWatchCache( - config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester) + config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, + config.Clock, eventFreshDuration, config.GroupResource, progressRequester) listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, contextMetadata) reflectorName := "storage/cacher.go:" + config.ResourcePrefix diff --git a/pkg/storage/cacher/cacher_test.go b/pkg/storage/cacher/cacher_test.go index 4168a1b4f..811d3ea24 100644 --- a/pkg/storage/cacher/cacher_test.go +++ b/pkg/storage/cacher/cacher_test.go @@ -470,18 +470,19 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context } config := Config{ - Storage: wrappedStorage, - Versioner: storage.APIObjectVersioner{}, - GroupResource: schema.GroupResource{Resource: "pods"}, - ResourcePrefix: setupOpts.resourcePrefix, - KeyFunc: setupOpts.keyFunc, - GetAttrsFunc: GetPodAttrs, - NewFunc: newPod, - NewListFunc: newPodList, - IndexerFuncs: setupOpts.indexerFuncs, - Indexers: &setupOpts.indexers, - Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), - Clock: setupOpts.clock, + Storage: wrappedStorage, + Versioner: storage.APIObjectVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, + EventsHistoryWindow: DefaultEventFreshDuration, + ResourcePrefix: setupOpts.resourcePrefix, + KeyFunc: setupOpts.keyFunc, + GetAttrsFunc: GetPodAttrs, + NewFunc: newPod, + NewListFunc: newPodList, + IndexerFuncs: setupOpts.indexerFuncs, + Indexers: &setupOpts.indexers, + Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), + Clock: setupOpts.clock, } cacher, err := NewCacherFromConfig(config) if err != nil { diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index f01c2bcc9..14c22015f 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -62,11 +62,12 @@ import ( func newTestCacherWithoutSyncing(s storage.Interface) (*Cacher, storage.Versioner, error) { prefix := "pods" config := Config{ - Storage: s, - Versioner: storage.APIObjectVersioner{}, - GroupResource: schema.GroupResource{Resource: "pods"}, - ResourcePrefix: prefix, - KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, + Storage: s, + Versioner: storage.APIObjectVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, + EventsHistoryWindow: DefaultEventFreshDuration, + ResourcePrefix: prefix, + KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) { pod, ok := obj.(*example.Pod) if !ok { @@ -2722,17 +2723,18 @@ func TestWatchStreamSeparation(t *testing.T) { setupOpts := &setupOptions{} withDefaults(setupOpts) config := Config{ - Storage: etcdStorage, - Versioner: storage.APIObjectVersioner{}, - GroupResource: schema.GroupResource{Resource: "pods"}, - ResourcePrefix: setupOpts.resourcePrefix, - KeyFunc: setupOpts.keyFunc, - GetAttrsFunc: GetPodAttrs, - NewFunc: newPod, - NewListFunc: newPodList, - IndexerFuncs: setupOpts.indexerFuncs, - Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), - Clock: setupOpts.clock, + Storage: etcdStorage, + Versioner: storage.APIObjectVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, + EventsHistoryWindow: DefaultEventFreshDuration, + ResourcePrefix: setupOpts.resourcePrefix, + KeyFunc: setupOpts.keyFunc, + GetAttrsFunc: GetPodAttrs, + NewFunc: newPod, + NewListFunc: newPodList, + IndexerFuncs: setupOpts.indexerFuncs, + Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), + Clock: setupOpts.clock, } tcs := []struct { name string diff --git a/pkg/storage/cacher/watch_cache.go b/pkg/storage/cacher/watch_cache.go index 541988b31..ae1506d03 100644 --- a/pkg/storage/cacher/watch_cache.go +++ b/pkg/storage/cacher/watch_cache.go @@ -52,17 +52,11 @@ const ( // after receiving a 'too high resource version' error. resourceVersionTooHighRetrySeconds = 1 - // eventFreshDuration is time duration of events we want to keep. - // We set it to `defaultBookmarkFrequency` plus epsilon to maximize - // chances that last bookmark was sent within kept history, at the - // same time, minimizing the needed memory usage. - eventFreshDuration = 75 * time.Second - // defaultLowerBoundCapacity is a default value for event cache capacity's lower bound. // TODO: Figure out, to what value we can decreased it. defaultLowerBoundCapacity = 100 - // defaultUpperBoundCapacity should be able to keep eventFreshDuration of history. + // defaultUpperBoundCapacity should be able to keep the required history. defaultUpperBoundCapacity = 100 * 1024 ) @@ -142,6 +136,9 @@ type watchCache struct { // for testing timeouts. clock clock.Clock + // eventFreshDuration defines the minimum watch history watchcache will store. + eventFreshDuration time.Duration + // An underlying storage.Versioner. versioner storage.Versioner @@ -163,6 +160,7 @@ func newWatchCache( versioner storage.Versioner, indexers *cache.Indexers, clock clock.WithTicker, + eventFreshDuration time.Duration, groupResource schema.GroupResource, progressRequester *conditionalProgressRequester) *watchCache { wc := &watchCache{ @@ -179,6 +177,7 @@ func newWatchCache( listResourceVersion: 0, eventHandler: eventHandler, clock: clock, + eventFreshDuration: eventFreshDuration, versioner: versioner, groupResource: groupResource, waitingUntilFresh: progressRequester, @@ -319,14 +318,14 @@ func (w *watchCache) updateCache(event *watchCacheEvent) { // - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration. // - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping). func (w *watchCache) resizeCacheLocked(eventTime time.Time) { - if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration { + if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < w.eventFreshDuration { capacity := min(w.capacity*2, w.upperBoundCapacity) if capacity > w.capacity { w.doCacheResizeLocked(capacity) } return } - if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration { + if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > w.eventFreshDuration { capacity := max(w.capacity/2, w.lowerBoundCapacity) if capacity < w.capacity { w.doCacheResizeLocked(capacity) @@ -660,7 +659,7 @@ func (w *watchCache) suggestedWatchChannelSize(indexExists, triggerUsed bool) in // We don't have an exact data, but given we store updates from // the last , we approach it by dividing the // capacity by the length of the history window. - chanSize := int(math.Ceil(float64(w.currentCapacity()) / eventFreshDuration.Seconds())) + chanSize := int(math.Ceil(float64(w.currentCapacity()) / w.eventFreshDuration.Seconds())) // Finally we adjust the size to avoid ending with too low or // to large values. diff --git a/pkg/storage/cacher/watch_cache_interval_test.go b/pkg/storage/cacher/watch_cache_interval_test.go index 487a5ac1e..3a8730533 100644 --- a/pkg/storage/cacher/watch_cache_interval_test.go +++ b/pkg/storage/cacher/watch_cache_interval_test.go @@ -286,7 +286,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - wc := newTestWatchCache(capacity, &cache.Indexers{}) + wc := newTestWatchCache(capacity, DefaultEventFreshDuration, &cache.Indexers{}) defer wc.Stop() for i := 0; i < c.eventsAddedToWatchcache; i++ { wc.Add(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i))) diff --git a/pkg/storage/cacher/watch_cache_test.go b/pkg/storage/cacher/watch_cache_test.go index 27f695014..221767e63 100644 --- a/pkg/storage/cacher/watch_cache_test.go +++ b/pkg/storage/cacher/watch_cache_test.go @@ -109,7 +109,7 @@ func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64, opts } // newTestWatchCache just adds a fake clock. -func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { +func newTestWatchCache(capacity int, eventFreshDuration time.Duration, indexers *cache.Indexers) *testWatchCache { keyFunc := func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc("prefix", obj) } @@ -127,7 +127,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { wc.stopCh = make(chan struct{}) pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil) go pr.Run(wc.stopCh) - wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}, pr) + wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), eventFreshDuration, schema.GroupResource{Resource: "pods"}, pr) // To preserve behavior of tests that assume a given capacity, // resize it to th expected size. wc.capacity = capacity @@ -194,7 +194,7 @@ func (w *testWatchCache) Stop() { } func TestWatchCacheBasic(t *testing.T) { - store := newTestWatchCache(2, &cache.Indexers{}) + store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // Test Add/Update/Delete. @@ -272,7 +272,7 @@ func TestWatchCacheBasic(t *testing.T) { } func TestEvents(t *testing.T) { - store := newTestWatchCache(5, &cache.Indexers{}) + store := newTestWatchCache(5, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // no dynamic-size cache to fit old tests. @@ -397,7 +397,7 @@ func TestEvents(t *testing.T) { } func TestMarker(t *testing.T) { - store := newTestWatchCache(3, &cache.Indexers{}) + store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // First thing that is called when propagated from storage is Replace. @@ -434,7 +434,7 @@ func TestMarker(t *testing.T) { func TestWaitUntilFreshAndList(t *testing.T) { ctx := context.Background() - store := newTestWatchCache(3, &cache.Indexers{ + store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{ "l:label": func(obj interface{}) ([]string, error) { pod, ok := obj.(*v1.Pod) if !ok { @@ -537,7 +537,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true) forceRequestWatchProgressSupport(t) ctx := context.Background() - store := newTestWatchCache(3, &cache.Indexers{}) + store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // In background, update the store. go func() { @@ -563,7 +563,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) { func TestWaitUntilFreshAndGet(t *testing.T) { ctx := context.Background() - store := newTestWatchCache(3, &cache.Indexers{}) + store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // In background, update the store. @@ -606,7 +606,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) { t.Run(tc.name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.ConsistentListFromCache) ctx := context.Background() - store := newTestWatchCache(3, &cache.Indexers{}) + store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() fc := store.clock.(*testingclock.FakeClock) @@ -651,7 +651,7 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) { func TestReflectorForWatchCache(t *testing.T) { ctx := context.Background() - store := newTestWatchCache(5, &cache.Indexers{}) + store := newTestWatchCache(5, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() { @@ -702,212 +702,212 @@ func TestDynamicCache(t *testing.T) { expectStartIndex int }{ { - name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding", + name: "[capacity not equals 4*n] events inside DefaultEventFreshDuration cause cache expanding", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration / 6, + interval: DefaultEventFreshDuration / 6, expectCapacity: 10, expectStartIndex: 0, }, { - name: "[capacity not equals 4*n] events outside eventFreshDuration without change cache capacity", + name: "[capacity not equals 4*n] events outside DefaultEventFreshDuration without change cache capacity", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration / 4, + interval: DefaultEventFreshDuration / 4, expectCapacity: 5, expectStartIndex: 0, }, { - name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking", + name: "[capacity not equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration + time.Second, + interval: DefaultEventFreshDuration + time.Second, expectCapacity: 2, expectStartIndex: 3, }, { - name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity", + name: "[capacity not equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 3, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration + time.Second, + interval: DefaultEventFreshDuration + time.Second, expectCapacity: 3, expectStartIndex: 2, }, { - name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity", + name: "[capacity not equals 4*n] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 8, - interval: eventFreshDuration / 6, + interval: DefaultEventFreshDuration / 6, expectCapacity: 8, expectStartIndex: 0, }, { - name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding", + name: "[capacity not equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration / 6, + interval: DefaultEventFreshDuration / 6, expectCapacity: 10, expectStartIndex: 3, }, { - name: "[capacity not equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity", + name: "[capacity not equals 4*n] [startIndex not equal 0] events outside DefaultEventFreshDuration without change cache capacity", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration / 4, + interval: DefaultEventFreshDuration / 4, expectCapacity: 5, expectStartIndex: 3, }, { - name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking", + name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration + time.Second, + interval: DefaultEventFreshDuration + time.Second, expectCapacity: 2, expectStartIndex: 6, }, { - name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity", + name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 3, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration + time.Second, + interval: DefaultEventFreshDuration + time.Second, expectCapacity: 3, expectStartIndex: 5, }, { - name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity", + name: "[capacity not equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 8, - interval: eventFreshDuration / 6, + interval: DefaultEventFreshDuration / 6, expectCapacity: 8, expectStartIndex: 3, }, { - name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding", + name: "[capacity equals 4*n] events inside DefaultEventFreshDuration cause cache expanding", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration / 9, + interval: DefaultEventFreshDuration / 9, expectCapacity: 16, expectStartIndex: 0, }, { - name: "[capacity equals 4*n] events outside eventFreshDuration without change cache capacity", + name: "[capacity equals 4*n] events outside DefaultEventFreshDuration without change cache capacity", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration / 8, + interval: DefaultEventFreshDuration / 8, expectCapacity: 8, expectStartIndex: 0, }, { - name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking", + name: "[capacity equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration/2 + time.Second, + interval: DefaultEventFreshDuration/2 + time.Second, expectCapacity: 4, expectStartIndex: 4, }, { - name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity", + name: "[capacity equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 7, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration/2 + time.Second, + interval: DefaultEventFreshDuration/2 + time.Second, expectCapacity: 7, expectStartIndex: 1, }, { - name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity", + name: "[capacity equals 4*n] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 10, - interval: eventFreshDuration / 9, + interval: DefaultEventFreshDuration / 9, expectCapacity: 10, expectStartIndex: 0, }, { - name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding", + name: "[capacity equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration / 9, + interval: DefaultEventFreshDuration / 9, expectCapacity: 16, expectStartIndex: 3, }, { - name: "[capacity equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity", + name: "[capacity equals 4*n] [startIndex not equal 0] events outside DefaultEventFreshDuration without change cache capacity", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration / 8, + interval: DefaultEventFreshDuration / 8, expectCapacity: 8, expectStartIndex: 3, }, { - name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking", + name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration/2 + time.Second, + interval: DefaultEventFreshDuration/2 + time.Second, expectCapacity: 4, expectStartIndex: 7, }, { - name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity", + name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 7, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration/2 + time.Second, + interval: DefaultEventFreshDuration/2 + time.Second, expectCapacity: 7, expectStartIndex: 4, }, { - name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity", + name: "[capacity equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 10, - interval: eventFreshDuration / 9, + interval: DefaultEventFreshDuration / 9, expectCapacity: 10, expectStartIndex: 3, }, @@ -915,7 +915,7 @@ func TestDynamicCache(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - store := newTestWatchCache(test.cacheCapacity, &cache.Indexers{}) + store := newTestWatchCache(test.cacheCapacity, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() store.cache = make([]*watchCacheEvent, test.cacheCapacity) store.startIndex = test.startIndex @@ -964,7 +964,7 @@ func checkCacheElements(cache *testWatchCache) bool { } func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { - store := newTestWatchCache(2, &cache.Indexers{}) + store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() now := store.clock.Now() @@ -983,7 +983,7 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { addEvent("key1", 20, now) // Force "key1" to rotate our of cache. - later := now.Add(2 * eventFreshDuration) + later := now.Add(2 * DefaultEventFreshDuration) addEvent("key2", 30, later) addEvent("key3", 40, later) @@ -998,122 +998,162 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { func TestSuggestedWatchChannelSize(t *testing.T) { testCases := []struct { - name string - capacity int - indexExists bool - triggerUsed bool - expected int + name string + capacity int + indexExists bool + triggerUsed bool + eventsFreshDuration time.Duration + expected int }{ { - name: "capacity=100, indexExists, triggerUsed", - capacity: 100, - indexExists: true, - triggerUsed: true, - expected: 10, + name: "capacity=100, indexExists, triggerUsed", + capacity: 100, + indexExists: true, + triggerUsed: true, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=100, indexExists, !triggerUsed", - capacity: 100, - indexExists: true, - triggerUsed: false, - expected: 10, + name: "capacity=100, indexExists, !triggerUsed", + capacity: 100, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=100, !indexExists", - capacity: 100, - indexExists: false, - triggerUsed: false, - expected: 10, + name: "capacity=100, !indexExists", + capacity: 100, + indexExists: false, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=750, indexExists, triggerUsed", - capacity: 750, - indexExists: true, - triggerUsed: true, - expected: 10, + name: "capacity=750, indexExists, triggerUsed", + capacity: 750, + indexExists: true, + triggerUsed: true, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=750, indexExists, !triggerUsed", - capacity: 750, - indexExists: true, - triggerUsed: false, - expected: 10, + name: "capacity=750, indexExists, !triggerUsed", + capacity: 750, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=750, !indexExists", - capacity: 750, - indexExists: false, - triggerUsed: false, - expected: 10, + name: "capacity=750, !indexExists", + capacity: 750, + indexExists: false, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=7500, indexExists, triggerUsed", - capacity: 7500, - indexExists: true, - triggerUsed: true, - expected: 10, + name: "capacity=7500, indexExists, triggerUsed", + capacity: 7500, + indexExists: true, + triggerUsed: true, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=7500, indexExists, !triggerUsed", - capacity: 7500, - indexExists: true, - triggerUsed: false, - expected: 100, + name: "capacity=7500, indexExists, !triggerUsed", + capacity: 7500, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 100, }, { - name: "capacity=7500, !indexExists", - capacity: 7500, - indexExists: false, - triggerUsed: false, - expected: 100, + name: "capacity=7500, indexExists, !triggerUsed, eventsFreshDuration=2m30s", + capacity: 7500, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: 2 * DefaultEventFreshDuration, + expected: 50, }, { - name: "capacity=75000, indexExists, triggerUsed", - capacity: 75000, - indexExists: true, - triggerUsed: true, - expected: 10, + name: "capacity=7500, !indexExists", + capacity: 7500, + indexExists: false, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 100, }, { - name: "capacity=75000, indexExists, !triggerUsed", - capacity: 75000, - indexExists: true, - triggerUsed: false, - expected: 1000, + name: "capacity=7500, !indexExists, eventsFreshDuration=2m30s", + capacity: 7500, + indexExists: false, + triggerUsed: false, + eventsFreshDuration: 2 * DefaultEventFreshDuration, + expected: 50, }, { - name: "capacity=75000, !indexExists", - capacity: 75000, - indexExists: false, - triggerUsed: false, - expected: 100, + name: "capacity=75000, indexExists, triggerUsed", + capacity: 75000, + indexExists: true, + triggerUsed: true, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=750000, indexExists, triggerUsed", - capacity: 750000, - indexExists: true, - triggerUsed: true, - expected: 10, + name: "capacity=75000, indexExists, !triggerUsed", + capacity: 75000, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 1000, }, { - name: "capacity=750000, indexExists, !triggerUsed", - capacity: 750000, - indexExists: true, - triggerUsed: false, - expected: 1000, + name: "capacity=75000, indexExists, !triggerUsed, eventsFreshDuration=2m30s", + capacity: 75000, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: 2 * DefaultEventFreshDuration, + expected: 500, }, { - name: "capacity=750000, !indexExists", - capacity: 750000, - indexExists: false, - triggerUsed: false, - expected: 100, + name: "capacity=75000, !indexExists", + capacity: 75000, + indexExists: false, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 100, + }, + { + name: "capacity=750000, indexExists, triggerUsed", + capacity: 750000, + indexExists: true, + triggerUsed: true, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, + }, + { + name: "capacity=750000, indexExists, !triggerUsed", + capacity: 750000, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 1000, + }, + { + name: "capacity=750000, !indexExists", + capacity: 750000, + indexExists: false, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 100, }, } for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - store := newTestWatchCache(test.capacity, &cache.Indexers{}) + store := newTestWatchCache(test.capacity, test.eventsFreshDuration, &cache.Indexers{}) defer store.Stop() got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed) if got != test.expected { @@ -1124,7 +1164,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) { } func BenchmarkWatchCache_updateCache(b *testing.B) { - store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{}) + store := newTestWatchCache(defaultUpperBoundCapacity, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() store.cache = store.cache[:0] store.upperBoundCapacity = defaultUpperBoundCapacity @@ -1146,7 +1186,7 @@ func TestHistogramCacheReadWait(t *testing.T) { } ctx := context.Background() testedMetrics := "apiserver_watch_cache_read_wait_seconds" - store := newTestWatchCache(2, &cache.Indexers{}) + store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // In background, update the store. diff --git a/pkg/storage/storagebackend/config.go b/pkg/storage/storagebackend/config.go index 822470778..c948d6411 100644 --- a/pkg/storage/storagebackend/config.go +++ b/pkg/storage/storagebackend/config.go @@ -37,6 +37,7 @@ const ( DefaultCompactInterval = 5 * time.Minute DefaultDBMetricPollInterval = 30 * time.Second + DefaultEventsHistoryWindow = 75 * time.Second DefaultHealthcheckTimeout = 2 * time.Second DefaultReadinessTimeout = 2 * time.Second ) @@ -80,6 +81,8 @@ type Config struct { CountMetricPollPeriod time.Duration // DBMetricPollInterval specifies how often should storage backend metric be updated. DBMetricPollInterval time.Duration + // EventsHistoryWindow specifies minimum history duration that storage is keeping. + EventsHistoryWindow time.Duration // HealthcheckTimeout specifies the timeout used when checking health HealthcheckTimeout time.Duration // ReadycheckTimeout specifies the timeout used when checking readiness @@ -115,6 +118,7 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { Codec: codec, CompactionInterval: DefaultCompactInterval, DBMetricPollInterval: DefaultDBMetricPollInterval, + EventsHistoryWindow: DefaultEventsHistoryWindow, HealthcheckTimeout: DefaultHealthcheckTimeout, ReadycheckTimeout: DefaultReadinessTimeout, LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(),