Add watch cache capacity upper bound adjusting logic

Kubernetes-commit: 2173a0fafd448e55b7738b88fbbab392125dd975
This commit is contained in:
Antoni Zawodny 2025-02-21 15:07:01 +01:00 committed by Kubernetes Publisher
parent 710763dd43
commit efe7a1b26d
2 changed files with 68 additions and 1 deletions

View File

@ -169,7 +169,7 @@ func newWatchCache(
getAttrsFunc: getAttrsFunc,
cache: make([]*watchCacheEvent, defaultLowerBoundCapacity),
lowerBoundCapacity: defaultLowerBoundCapacity,
upperBoundCapacity: defaultUpperBoundCapacity,
upperBoundCapacity: capacityUpperBound(eventFreshDuration),
startIndex: 0,
endIndex: 0,
store: newStoreIndexer(indexers),
@ -189,6 +189,30 @@ func newWatchCache(
return wc
}
// capacityUpperBound denotes the maximum possible capacity of the watch cache
// to which it can resize.
func capacityUpperBound(eventFreshDuration time.Duration) int {
if eventFreshDuration <= DefaultEventFreshDuration {
return defaultUpperBoundCapacity
}
// eventFreshDuration determines how long the watch events are supposed
// to be stored in the watch cache.
// In very high churn situations, there is a need to store more events
// in the watch cache, hence it would have to be upsized accordingly.
// Because of that, for larger values of eventFreshDuration, we set the
// upper bound of the watch cache's capacity proportionally to the ratio
// between eventFreshDuration and DefaultEventFreshDuration.
// Given that the watch cache size can only double, we round up that
// proportion to the next power of two.
exponent := int(math.Ceil((math.Log2(eventFreshDuration.Seconds() / DefaultEventFreshDuration.Seconds()))))
if maxExponent := int(math.Floor((math.Log2(math.MaxInt32 / defaultUpperBoundCapacity)))); exponent > maxExponent {
// Making sure that the capacity's upper bound fits in a 32-bit integer.
exponent = maxExponent
klog.Warningf("Capping watch cache capacity upper bound to %v", defaultUpperBoundCapacity<<exponent)
}
return defaultUpperBoundCapacity << exponent
}
// Add takes runtime.Object as an argument.
func (w *watchCache) Add(obj interface{}) error {
object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)

View File

@ -1163,6 +1163,49 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
}
}
func TestCapacityUpperBound(t *testing.T) {
testCases := []struct {
name string
eventFreshDuration time.Duration
expected int
}{
{
name: "default eventFreshDuration",
eventFreshDuration: DefaultEventFreshDuration, // 75s
expected: defaultUpperBoundCapacity, // 100 * 1024
},
{
name: "lower eventFreshDuration, capacity limit unchanged",
eventFreshDuration: 45 * time.Second, // 45s
expected: defaultUpperBoundCapacity, // 100 * 1024
},
{
name: "higher eventFreshDuration, capacity limit scaled up",
eventFreshDuration: 4 * DefaultEventFreshDuration, // 4 * 75s
expected: 4 * defaultUpperBoundCapacity, // 4 * 100 * 1024
},
{
name: "higher eventFreshDuration, capacity limit scaled and rounded up",
eventFreshDuration: 3 * DefaultEventFreshDuration, // 3 * 75s
expected: 4 * defaultUpperBoundCapacity, // 4 * 100 * 1024
},
{
name: "higher eventFreshDuration, capacity limit scaled up and capped",
eventFreshDuration: DefaultEventFreshDuration << 20, // 2^20 * 75s
expected: defaultUpperBoundCapacity << 14, // 2^14 * 100 * 1024
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
capacity := capacityUpperBound(test.eventFreshDuration)
if test.expected != capacity {
t.Errorf("expected %v, got %v", test.expected, capacity)
}
})
}
}
func BenchmarkWatchCache_updateCache(b *testing.B) {
store := newTestWatchCache(defaultUpperBoundCapacity, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()