Merge pull request #129205 from tosi3k/wc-configurable-retention

Configure watch cache history window based on request timeout

Kubernetes-commit: 107be8feccfce7259b402ed75415309268744a24
This commit is contained in:
Kubernetes Publisher 2024-12-17 20:50:58 +01:00
commit b3597c01bb
11 changed files with 265 additions and 201 deletions

2
go.mod
View File

@ -54,7 +54,7 @@ require (
gopkg.in/square/go-jose.v2 v2.6.0
k8s.io/api v0.0.0-20241214014715-eac45518d7fe
k8s.io/apimachinery v0.0.0-20241214014415-767f17a6afea
k8s.io/client-go v0.0.0-20241214015128-61ee2c5802c7
k8s.io/client-go v0.0.0-20241215015103-67da6d1a4174
k8s.io/component-base v0.0.0-20241214020124-b7fbd0d55e44
k8s.io/klog/v2 v2.130.1
k8s.io/kms v0.0.0-20241213100418-8cb606989fcf

4
go.sum
View File

@ -369,8 +369,8 @@ k8s.io/api v0.0.0-20241214014715-eac45518d7fe h1:3brWXwMKrWloyi0Qqv6SqRIGC/yS1wf
k8s.io/api v0.0.0-20241214014715-eac45518d7fe/go.mod h1:TeD+e60UFfC0xfnP9/tT92lG7sSnSs+ebTPX1oCNrDU=
k8s.io/apimachinery v0.0.0-20241214014415-767f17a6afea h1:ZUHj/k511rdZLx69atS9F5P+PDDQarX9DmI8/3TQ15Y=
k8s.io/apimachinery v0.0.0-20241214014415-767f17a6afea/go.mod h1:vmecNW2HWfNZboIXS3Vg/3qp+T42YyW6jCpcdhnas9s=
k8s.io/client-go v0.0.0-20241214015128-61ee2c5802c7 h1:W2NxcoEN3y/P48IBQdJgBj2nWlU+dyXvRQKjRYdV6y8=
k8s.io/client-go v0.0.0-20241214015128-61ee2c5802c7/go.mod h1:9DOj9Eg/2wdCibOBBR8J+SamkzoU+TVr9bk5B7KAbgM=
k8s.io/client-go v0.0.0-20241215015103-67da6d1a4174 h1:NcD7ZRs38+ChK6qTJN5ahkVq7MWuNaZZiO1AspZLwck=
k8s.io/client-go v0.0.0-20241215015103-67da6d1a4174/go.mod h1:9DOj9Eg/2wdCibOBBR8J+SamkzoU+TVr9bk5B7KAbgM=
k8s.io/component-base v0.0.0-20241214020124-b7fbd0d55e44 h1:UGwTOasY4f/2bmwNlInBJjkhZw3lWGy+gk+w5tQOAvc=
k8s.io/component-base v0.0.0-20241214020124-b7fbd0d55e44/go.mod h1:u4F22/ZnzBZX8Sw10wXHts7D9Ak81nBSrbdckaDcvjE=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=

View File

@ -57,6 +57,7 @@ func StorageWithCacher() generic.StorageDecorator {
Storage: s,
Versioner: storage.APIObjectVersioner{},
GroupResource: storageConfig.GroupResource,
EventsHistoryWindow: storageConfig.EventsHistoryWindow,
ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc,
NewFunc: newFunc,

View File

@ -2438,6 +2438,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
Storage: s,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
EventsHistoryWindow: cacherstorage.DefaultEventFreshDuration,
ResourcePrefix: podPrefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
GetAttrsFunc: getPodAttrs,

View File

@ -61,10 +61,16 @@ const (
// storageWatchListPageSize is the cacher's request chunk size of
// initial and resync watch lists to storage.
storageWatchListPageSize = int64(10000)
// DefaultEventFreshDuration is the default time duration of events
// we want to keep.
// We set it to defaultBookmarkFrequency plus epsilon to maximize
// chances that last bookmark was sent within kept history, at the
// same time, minimizing the needed memory usage.
DefaultEventFreshDuration = defaultBookmarkFrequency + 15*time.Second
// defaultBookmarkFrequency defines how frequently watch bookmarks should be send
// in addition to sending a bookmark right before watch deadline.
//
// NOTE: Update `eventFreshDuration` when changing this value.
defaultBookmarkFrequency = time.Minute
)
@ -80,6 +86,10 @@ type Config struct {
// and metrics.
GroupResource schema.GroupResource
// EventsHistoryWindow specifies minimum history duration that storage is keeping.
// If lower than DefaultEventFreshDuration, the cache creation will fail.
EventsHistoryWindow time.Duration
// The Cache will be caching objects of a given Type and assumes that they
// are all stored under ResourcePrefix directory in the underlying database.
ResourcePrefix string
@ -409,9 +419,15 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
contextMetadata = metadata.New(map[string]string{"source": "cache"})
}
eventFreshDuration := config.EventsHistoryWindow
if eventFreshDuration < DefaultEventFreshDuration {
return nil, fmt.Errorf("config.EventsHistoryWindow (%v) must not be below %v", eventFreshDuration, DefaultEventFreshDuration)
}
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers,
config.Clock, eventFreshDuration, config.GroupResource, progressRequester)
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, contextMetadata)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix

View File

@ -473,6 +473,7 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
Storage: wrappedStorage,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
EventsHistoryWindow: DefaultEventFreshDuration,
ResourcePrefix: setupOpts.resourcePrefix,
KeyFunc: setupOpts.keyFunc,
GetAttrsFunc: GetPodAttrs,

View File

@ -65,6 +65,7 @@ func newTestCacherWithoutSyncing(s storage.Interface) (*Cacher, storage.Versione
Storage: s,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
EventsHistoryWindow: DefaultEventFreshDuration,
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) {
@ -2725,6 +2726,7 @@ func TestWatchStreamSeparation(t *testing.T) {
Storage: etcdStorage,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
EventsHistoryWindow: DefaultEventFreshDuration,
ResourcePrefix: setupOpts.resourcePrefix,
KeyFunc: setupOpts.keyFunc,
GetAttrsFunc: GetPodAttrs,

View File

@ -52,17 +52,11 @@ const (
// after receiving a 'too high resource version' error.
resourceVersionTooHighRetrySeconds = 1
// eventFreshDuration is time duration of events we want to keep.
// We set it to `defaultBookmarkFrequency` plus epsilon to maximize
// chances that last bookmark was sent within kept history, at the
// same time, minimizing the needed memory usage.
eventFreshDuration = 75 * time.Second
// defaultLowerBoundCapacity is a default value for event cache capacity's lower bound.
// TODO: Figure out, to what value we can decreased it.
defaultLowerBoundCapacity = 100
// defaultUpperBoundCapacity should be able to keep eventFreshDuration of history.
// defaultUpperBoundCapacity should be able to keep the required history.
defaultUpperBoundCapacity = 100 * 1024
)
@ -142,6 +136,9 @@ type watchCache struct {
// for testing timeouts.
clock clock.Clock
// eventFreshDuration defines the minimum watch history watchcache will store.
eventFreshDuration time.Duration
// An underlying storage.Versioner.
versioner storage.Versioner
@ -163,6 +160,7 @@ func newWatchCache(
versioner storage.Versioner,
indexers *cache.Indexers,
clock clock.WithTicker,
eventFreshDuration time.Duration,
groupResource schema.GroupResource,
progressRequester *conditionalProgressRequester) *watchCache {
wc := &watchCache{
@ -179,6 +177,7 @@ func newWatchCache(
listResourceVersion: 0,
eventHandler: eventHandler,
clock: clock,
eventFreshDuration: eventFreshDuration,
versioner: versioner,
groupResource: groupResource,
waitingUntilFresh: progressRequester,
@ -319,14 +318,14 @@ func (w *watchCache) updateCache(event *watchCacheEvent) {
// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < w.eventFreshDuration {
capacity := min(w.capacity*2, w.upperBoundCapacity)
if capacity > w.capacity {
w.doCacheResizeLocked(capacity)
}
return
}
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > w.eventFreshDuration {
capacity := max(w.capacity/2, w.lowerBoundCapacity)
if capacity < w.capacity {
w.doCacheResizeLocked(capacity)
@ -660,7 +659,7 @@ func (w *watchCache) suggestedWatchChannelSize(indexExists, triggerUsed bool) in
// We don't have an exact data, but given we store updates from
// the last <eventFreshDuration>, we approach it by dividing the
// capacity by the length of the history window.
chanSize := int(math.Ceil(float64(w.currentCapacity()) / eventFreshDuration.Seconds()))
chanSize := int(math.Ceil(float64(w.currentCapacity()) / w.eventFreshDuration.Seconds()))
// Finally we adjust the size to avoid ending with too low or
// to large values.

View File

@ -286,7 +286,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
wc := newTestWatchCache(capacity, &cache.Indexers{})
wc := newTestWatchCache(capacity, DefaultEventFreshDuration, &cache.Indexers{})
defer wc.Stop()
for i := 0; i < c.eventsAddedToWatchcache; i++ {
wc.Add(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i)))

View File

@ -109,7 +109,7 @@ func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64, opts
}
// newTestWatchCache just adds a fake clock.
func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
func newTestWatchCache(capacity int, eventFreshDuration time.Duration, indexers *cache.Indexers) *testWatchCache {
keyFunc := func(obj runtime.Object) (string, error) {
return storage.NamespaceKeyFunc("prefix", obj)
}
@ -127,7 +127,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
wc.stopCh = make(chan struct{})
pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil)
go pr.Run(wc.stopCh)
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}, pr)
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), eventFreshDuration, schema.GroupResource{Resource: "pods"}, pr)
// To preserve behavior of tests that assume a given capacity,
// resize it to th expected size.
wc.capacity = capacity
@ -194,7 +194,7 @@ func (w *testWatchCache) Stop() {
}
func TestWatchCacheBasic(t *testing.T) {
store := newTestWatchCache(2, &cache.Indexers{})
store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// Test Add/Update/Delete.
@ -272,7 +272,7 @@ func TestWatchCacheBasic(t *testing.T) {
}
func TestEvents(t *testing.T) {
store := newTestWatchCache(5, &cache.Indexers{})
store := newTestWatchCache(5, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// no dynamic-size cache to fit old tests.
@ -397,7 +397,7 @@ func TestEvents(t *testing.T) {
}
func TestMarker(t *testing.T) {
store := newTestWatchCache(3, &cache.Indexers{})
store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// First thing that is called when propagated from storage is Replace.
@ -434,7 +434,7 @@ func TestMarker(t *testing.T) {
func TestWaitUntilFreshAndList(t *testing.T) {
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{
store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{
"l:label": func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
@ -537,7 +537,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
forceRequestWatchProgressSupport(t)
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{})
store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// In background, update the store.
go func() {
@ -563,7 +563,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) {
func TestWaitUntilFreshAndGet(t *testing.T) {
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{})
store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// In background, update the store.
@ -606,7 +606,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.ConsistentListFromCache)
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{})
store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
fc := store.clock.(*testingclock.FakeClock)
@ -651,7 +651,7 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
func TestReflectorForWatchCache(t *testing.T) {
ctx := context.Background()
store := newTestWatchCache(5, &cache.Indexers{})
store := newTestWatchCache(5, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
{
@ -702,212 +702,212 @@ func TestDynamicCache(t *testing.T) {
expectStartIndex int
}{
{
name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding",
name: "[capacity not equals 4*n] events inside DefaultEventFreshDuration cause cache expanding",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration / 6,
interval: DefaultEventFreshDuration / 6,
expectCapacity: 10,
expectStartIndex: 0,
},
{
name: "[capacity not equals 4*n] events outside eventFreshDuration without change cache capacity",
name: "[capacity not equals 4*n] events outside DefaultEventFreshDuration without change cache capacity",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration / 4,
interval: DefaultEventFreshDuration / 4,
expectCapacity: 5,
expectStartIndex: 0,
},
{
name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking",
name: "[capacity not equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration + time.Second,
interval: DefaultEventFreshDuration + time.Second,
expectCapacity: 2,
expectStartIndex: 3,
},
{
name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
name: "[capacity not equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 3,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration + time.Second,
interval: DefaultEventFreshDuration + time.Second,
expectCapacity: 3,
expectStartIndex: 2,
},
{
name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
name: "[capacity not equals 4*n] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 8,
interval: eventFreshDuration / 6,
interval: DefaultEventFreshDuration / 6,
expectCapacity: 8,
expectStartIndex: 0,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding",
name: "[capacity not equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration / 6,
interval: DefaultEventFreshDuration / 6,
expectCapacity: 10,
expectStartIndex: 3,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity",
name: "[capacity not equals 4*n] [startIndex not equal 0] events outside DefaultEventFreshDuration without change cache capacity",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration / 4,
interval: DefaultEventFreshDuration / 4,
expectCapacity: 5,
expectStartIndex: 3,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking",
name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration + time.Second,
interval: DefaultEventFreshDuration + time.Second,
expectCapacity: 2,
expectStartIndex: 6,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 3,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration + time.Second,
interval: DefaultEventFreshDuration + time.Second,
expectCapacity: 3,
expectStartIndex: 5,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
name: "[capacity not equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 8,
interval: eventFreshDuration / 6,
interval: DefaultEventFreshDuration / 6,
expectCapacity: 8,
expectStartIndex: 3,
},
{
name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding",
name: "[capacity equals 4*n] events inside DefaultEventFreshDuration cause cache expanding",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration / 9,
interval: DefaultEventFreshDuration / 9,
expectCapacity: 16,
expectStartIndex: 0,
},
{
name: "[capacity equals 4*n] events outside eventFreshDuration without change cache capacity",
name: "[capacity equals 4*n] events outside DefaultEventFreshDuration without change cache capacity",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration / 8,
interval: DefaultEventFreshDuration / 8,
expectCapacity: 8,
expectStartIndex: 0,
},
{
name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking",
name: "[capacity equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration/2 + time.Second,
interval: DefaultEventFreshDuration/2 + time.Second,
expectCapacity: 4,
expectStartIndex: 4,
},
{
name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
name: "[capacity equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 7,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration/2 + time.Second,
interval: DefaultEventFreshDuration/2 + time.Second,
expectCapacity: 7,
expectStartIndex: 1,
},
{
name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
name: "[capacity equals 4*n] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 10,
interval: eventFreshDuration / 9,
interval: DefaultEventFreshDuration / 9,
expectCapacity: 10,
expectStartIndex: 0,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding",
name: "[capacity equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration / 9,
interval: DefaultEventFreshDuration / 9,
expectCapacity: 16,
expectStartIndex: 3,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity",
name: "[capacity equals 4*n] [startIndex not equal 0] events outside DefaultEventFreshDuration without change cache capacity",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration / 8,
interval: DefaultEventFreshDuration / 8,
expectCapacity: 8,
expectStartIndex: 3,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking",
name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration/2 + time.Second,
interval: DefaultEventFreshDuration/2 + time.Second,
expectCapacity: 4,
expectStartIndex: 7,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 7,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration/2 + time.Second,
interval: DefaultEventFreshDuration/2 + time.Second,
expectCapacity: 7,
expectStartIndex: 4,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
name: "[capacity equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 10,
interval: eventFreshDuration / 9,
interval: DefaultEventFreshDuration / 9,
expectCapacity: 10,
expectStartIndex: 3,
},
@ -915,7 +915,7 @@ func TestDynamicCache(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := newTestWatchCache(test.cacheCapacity, &cache.Indexers{})
store := newTestWatchCache(test.cacheCapacity, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
store.cache = make([]*watchCacheEvent, test.cacheCapacity)
store.startIndex = test.startIndex
@ -964,7 +964,7 @@ func checkCacheElements(cache *testWatchCache) bool {
}
func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
store := newTestWatchCache(2, &cache.Indexers{})
store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
now := store.clock.Now()
@ -983,7 +983,7 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
addEvent("key1", 20, now)
// Force "key1" to rotate our of cache.
later := now.Add(2 * eventFreshDuration)
later := now.Add(2 * DefaultEventFreshDuration)
addEvent("key2", 30, later)
addEvent("key3", 40, later)
@ -1002,6 +1002,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity int
indexExists bool
triggerUsed bool
eventsFreshDuration time.Duration
expected int
}{
{
@ -1009,6 +1010,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity: 100,
indexExists: true,
triggerUsed: true,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
@ -1016,6 +1018,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity: 100,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
@ -1023,6 +1026,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity: 100,
indexExists: false,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
@ -1030,6 +1034,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity: 750,
indexExists: true,
triggerUsed: true,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
@ -1037,6 +1042,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity: 750,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
@ -1044,6 +1050,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity: 750,
indexExists: false,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
@ -1051,6 +1058,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity: 7500,
indexExists: true,
triggerUsed: true,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
@ -1058,20 +1066,39 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity: 7500,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 100,
},
{
name: "capacity=7500, indexExists, !triggerUsed, eventsFreshDuration=2m30s",
capacity: 7500,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: 2 * DefaultEventFreshDuration,
expected: 50,
},
{
name: "capacity=7500, !indexExists",
capacity: 7500,
indexExists: false,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 100,
},
{
name: "capacity=7500, !indexExists, eventsFreshDuration=2m30s",
capacity: 7500,
indexExists: false,
triggerUsed: false,
eventsFreshDuration: 2 * DefaultEventFreshDuration,
expected: 50,
},
{
name: "capacity=75000, indexExists, triggerUsed",
capacity: 75000,
indexExists: true,
triggerUsed: true,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
@ -1079,13 +1106,23 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity: 75000,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 1000,
},
{
name: "capacity=75000, indexExists, !triggerUsed, eventsFreshDuration=2m30s",
capacity: 75000,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: 2 * DefaultEventFreshDuration,
expected: 500,
},
{
name: "capacity=75000, !indexExists",
capacity: 75000,
indexExists: false,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 100,
},
{
@ -1093,6 +1130,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity: 750000,
indexExists: true,
triggerUsed: true,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
@ -1100,6 +1138,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity: 750000,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 1000,
},
{
@ -1107,13 +1146,14 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
capacity: 750000,
indexExists: false,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 100,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
store := newTestWatchCache(test.capacity, &cache.Indexers{})
store := newTestWatchCache(test.capacity, test.eventsFreshDuration, &cache.Indexers{})
defer store.Stop()
got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed)
if got != test.expected {
@ -1124,7 +1164,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
}
func BenchmarkWatchCache_updateCache(b *testing.B) {
store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{})
store := newTestWatchCache(defaultUpperBoundCapacity, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
store.cache = store.cache[:0]
store.upperBoundCapacity = defaultUpperBoundCapacity
@ -1146,7 +1186,7 @@ func TestHistogramCacheReadWait(t *testing.T) {
}
ctx := context.Background()
testedMetrics := "apiserver_watch_cache_read_wait_seconds"
store := newTestWatchCache(2, &cache.Indexers{})
store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// In background, update the store.

View File

@ -37,6 +37,7 @@ const (
DefaultCompactInterval = 5 * time.Minute
DefaultDBMetricPollInterval = 30 * time.Second
DefaultEventsHistoryWindow = 75 * time.Second
DefaultHealthcheckTimeout = 2 * time.Second
DefaultReadinessTimeout = 2 * time.Second
)
@ -80,6 +81,8 @@ type Config struct {
CountMetricPollPeriod time.Duration
// DBMetricPollInterval specifies how often should storage backend metric be updated.
DBMetricPollInterval time.Duration
// EventsHistoryWindow specifies minimum history duration that storage is keeping.
EventsHistoryWindow time.Duration
// HealthcheckTimeout specifies the timeout used when checking health
HealthcheckTimeout time.Duration
// ReadycheckTimeout specifies the timeout used when checking readiness
@ -115,6 +118,7 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
Codec: codec,
CompactionInterval: DefaultCompactInterval,
DBMetricPollInterval: DefaultDBMetricPollInterval,
EventsHistoryWindow: DefaultEventsHistoryWindow,
HealthcheckTimeout: DefaultHealthcheckTimeout,
ReadycheckTimeout: DefaultReadinessTimeout,
LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(),