Use dynamic size watch-cache.

If all cached events occur inside eventFreshDuration, increase cache capacity by 2x.
Decrease cache capacity by 2x when recent half events occur outside eventFreshDuration.

Kubernetes-commit: 56407b656c7acf6039cead0192070429e53a0c70
This commit is contained in:
louisgong 2020-04-12 17:22:38 +08:00 committed by Kubernetes Publisher
parent e6e5258db0
commit 4c8b97679c
6 changed files with 426 additions and 14 deletions

View File

@ -315,10 +315,11 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
}
clock := clock.RealClock{}
objType := reflect.TypeOf(obj)
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
objectType: reflect.TypeOf(obj),
objectType: objType,
versioner: config.Versioner,
newFunc: config.NewFunc,
indexedTrigger: indexedTrigger,
@ -349,7 +350,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
}
watchCache := newWatchCache(
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers)
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, objType)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix

View File

@ -1,5 +1,5 @@
/*
Copyright 2019 The Kubernetes Authors.
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -33,7 +33,25 @@ var (
initCounter = metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "apiserver_init_events_total",
Help: "Counter of init events processed in watchcache broken by resource type",
Help: "Counter of init events processed in watchcache broken by resource type.",
StabilityLevel: metrics.ALPHA,
},
[]string{"resource"},
)
watchCacheCapacityIncreaseTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "watch_cache_capacity_increase_total",
Help: "Total number of watch cache capacity increase events broken by resource type.",
StabilityLevel: metrics.ALPHA,
},
[]string{"resource"},
)
watchCacheCapacityDecreaseTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "watch_cache_capacity_decrease_total",
Help: "Total number of watch cache capacity decrease events broken by resource type.",
StabilityLevel: metrics.ALPHA,
},
[]string{"resource"},
@ -42,4 +60,15 @@ var (
func init() {
legacyregistry.MustRegister(initCounter)
legacyregistry.MustRegister(watchCacheCapacityIncreaseTotal)
legacyregistry.MustRegister(watchCacheCapacityDecreaseTotal)
}
// recordsWatchCacheCapacityChange record watchCache capacity resize(increase or decrease) operations.
func recordsWatchCacheCapacityChange(objType string, old, new int) {
if old < new {
watchCacheCapacityIncreaseTotal.WithLabelValues(objType).Inc()
return
}
watchCacheCapacityDecreaseTotal.WithLabelValues(objType).Inc()
}

View File

@ -44,3 +44,17 @@ func hasPathPrefix(s, pathPrefix string) bool {
}
return false
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
func min(a, b int) int {
if a < b {
return a
}
return b
}

View File

@ -18,6 +18,7 @@ package cacher
import (
"fmt"
"reflect"
"sort"
"sync"
"time"
@ -44,6 +45,20 @@ const (
// resourceVersionTooHighRetrySeconds is the seconds before a operation should be retried by the client
// after receiving a 'too high resource version' error.
resourceVersionTooHighRetrySeconds = 1
// eventFreshDuration is time duration of events we want to keep.
eventFreshDuration = 5 * time.Minute
// defaultLowerBoundCapacity is a default value for event cache capacity's lower bound.
// 100 is minimum in NewHeuristicWatchCacheSizes.
// TODO: Figure out, to what value we can decreased it.
defaultLowerBoundCapacity = 100
// defaultUpperBoundCapacity should be able to keep eventFreshDuration of history.
// With the current 102400 value though, it's not enough for leases in 5k-node cluster,
// but that is conscious decision.
// TODO: Validate if the current value is high enough for large scale clusters.
defaultUpperBoundCapacity = 100 * 1024
)
// watchCacheEvent is a single "watch event" that is send to users of
@ -60,6 +75,7 @@ type watchCacheEvent struct {
PrevObjFields fields.Set
Key string
ResourceVersion uint64
RecordTime time.Time
}
// Computing a key of an object is generally non-trivial (it performs
@ -126,6 +142,12 @@ type watchCache struct {
// Maximum size of history window.
capacity int
// upper bound of capacity since event cache has a dynamic size.
upperBoundCapacity int
// lower bound of capacity since event cache has a dynamic size.
lowerBoundCapacity int
// keyFunc is used to get a key in the underlying storage for a given object.
keyFunc func(runtime.Object) (string, error)
@ -165,6 +187,9 @@ type watchCache struct {
// An underlying storage.Versioner.
versioner storage.Versioner
// cacher's objectType.
objectType reflect.Type
}
func newWatchCache(
@ -173,12 +198,16 @@ func newWatchCache(
eventHandler func(*watchCacheEvent),
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
versioner storage.Versioner,
indexers *cache.Indexers) *watchCache {
indexers *cache.Indexers,
objectType reflect.Type) *watchCache {
wc := &watchCache{
capacity: capacity,
keyFunc: keyFunc,
getAttrsFunc: getAttrsFunc,
cache: make([]*watchCacheEvent, capacity),
capacity: capacity,
keyFunc: keyFunc,
getAttrsFunc: getAttrsFunc,
cache: make([]*watchCacheEvent, capacity),
// TODO get rid of them once we stop passing capacity as a parameter to watch cache.
lowerBoundCapacity: min(capacity, defaultLowerBoundCapacity),
upperBoundCapacity: max(capacity, defaultUpperBoundCapacity),
startIndex: 0,
endIndex: 0,
store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)),
@ -187,6 +216,7 @@ func newWatchCache(
eventHandler: eventHandler,
clock: clock.RealClock{},
versioner: versioner,
objectType: objectType,
}
wc.cond = sync.NewCond(wc.RLocker())
return wc
@ -260,6 +290,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
ObjFields: elem.Fields,
Key: key,
ResourceVersion: resourceVersion,
RecordTime: w.clock.Now(),
}
if err := func() error {
@ -301,7 +332,8 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
// Assumes that lock is already held for write.
func (w *watchCache) updateCache(event *watchCacheEvent) {
if w.endIndex == w.startIndex+w.capacity {
w.resizeCacheLocked(event.RecordTime)
if w.isCacheFullLocked() {
// Cache is full - remove the oldest element.
w.startIndex++
}
@ -309,6 +341,48 @@ func (w *watchCache) updateCache(event *watchCacheEvent) {
w.endIndex++
}
// resizeCacheLocked resizes the cache if necessary:
// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
capacity := min(w.capacity*2, w.upperBoundCapacity)
if capacity > w.capacity {
w.doCacheResizeLocked(capacity)
}
return
}
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
capacity := max(w.capacity/2, w.lowerBoundCapacity)
if capacity < w.capacity {
w.doCacheResizeLocked(capacity)
}
return
}
}
// isCacheFullLocked used to judge whether watchCacheEvent is full.
// Assumes that lock is already held for write.
func (w *watchCache) isCacheFullLocked() bool {
return w.endIndex == w.startIndex+w.capacity
}
// doCacheResizeLocked resize watchCache's event array with different capacity.
// Assumes that lock is already held for write.
func (w *watchCache) doCacheResizeLocked(capacity int) {
newCache := make([]*watchCacheEvent, capacity)
if capacity < w.capacity {
// adjust startIndex if cache capacity shrink.
w.startIndex = w.endIndex - capacity
}
for i := w.startIndex; i < w.endIndex; i++ {
newCache[i%capacity] = w.cache[i%w.capacity]
}
w.cache = newCache
recordsWatchCacheCapacityChange(w.objectType.String(), w.capacity, capacity)
w.capacity = capacity
}
// List returns list of pointers to <storeElement> objects.
func (w *watchCache) List() []interface{} {
return w.store.List()

View File

@ -18,6 +18,7 @@ package cacher
import (
"fmt"
"reflect"
"strconv"
"strings"
"testing"
@ -33,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/client-go/tools/cache"
@ -79,7 +81,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache {
}
versioner := etcd3.APIObjectVersioner{}
mockHandler := func(*watchCacheEvent) {}
wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner, indexers)
wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner, indexers, reflect.TypeOf(&example.Pod{}))
wc.clock = clock.NewFakeClock(time.Now())
return wc
}
@ -164,6 +166,10 @@ func TestWatchCacheBasic(t *testing.T) {
func TestEvents(t *testing.T) {
store := newTestWatchCache(5, &cache.Indexers{})
// no dynamic-size cache to fit old tests.
store.lowerBoundCapacity = 5
store.upperBoundCapacity = 5
store.Add(makeTestPod("pod", 3))
// Test for Added event.
@ -501,3 +507,292 @@ func TestReflectorForWatchCache(t *testing.T) {
}
}
}
func TestDynamicCache(t *testing.T) {
tests := []struct {
name string
eventCount int
cacheCapacity int
startIndex int
// interval is time duration between adjacent events.
lowerBoundCapacity int
upperBoundCapacity int
interval time.Duration
expectCapacity int
expectStartIndex int
}{
{
name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration / 6,
expectCapacity: 10,
expectStartIndex: 0,
},
{
name: "[capacity not equals 4*n] events outside eventFreshDuration without change cache capacity",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration / 4,
expectCapacity: 5,
expectStartIndex: 0,
},
{
name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration + time.Second,
expectCapacity: 2,
expectStartIndex: 3,
},
{
name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 3,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration + time.Second,
expectCapacity: 3,
expectStartIndex: 2,
},
{
name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 8,
interval: eventFreshDuration / 6,
expectCapacity: 8,
expectStartIndex: 0,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration / 6,
expectCapacity: 10,
expectStartIndex: 3,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration / 4,
expectCapacity: 5,
expectStartIndex: 3,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration + time.Second,
expectCapacity: 2,
expectStartIndex: 6,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 3,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration + time.Second,
expectCapacity: 3,
expectStartIndex: 5,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 8,
interval: eventFreshDuration / 6,
expectCapacity: 8,
expectStartIndex: 3,
},
{
name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration / 9,
expectCapacity: 16,
expectStartIndex: 0,
},
{
name: "[capacity equals 4*n] events outside eventFreshDuration without change cache capacity",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration / 8,
expectCapacity: 8,
expectStartIndex: 0,
},
{
name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration/2 + time.Second,
expectCapacity: 4,
expectStartIndex: 4,
},
{
name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 7,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration/2 + time.Second,
expectCapacity: 7,
expectStartIndex: 1,
},
{
name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 10,
interval: eventFreshDuration / 9,
expectCapacity: 10,
expectStartIndex: 0,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration / 9,
expectCapacity: 16,
expectStartIndex: 3,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration / 8,
expectCapacity: 8,
expectStartIndex: 3,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration/2 + time.Second,
expectCapacity: 4,
expectStartIndex: 7,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 7,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration/2 + time.Second,
expectCapacity: 7,
expectStartIndex: 4,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 10,
interval: eventFreshDuration / 9,
expectCapacity: 10,
expectStartIndex: 3,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := newTestWatchCache(test.cacheCapacity, &cache.Indexers{})
store.cache = make([]*watchCacheEvent, test.cacheCapacity)
store.startIndex = test.startIndex
store.lowerBoundCapacity = test.lowerBoundCapacity
store.upperBoundCapacity = test.upperBoundCapacity
loadEventWithDuration(store, test.eventCount, test.interval)
nextInterval := store.clock.Now().Add(time.Duration(test.interval.Nanoseconds() * int64(test.eventCount)))
store.resizeCacheLocked(nextInterval)
if store.capacity != test.expectCapacity {
t.Errorf("expect capacity %d, but get %d", test.expectCapacity, store.capacity)
}
// check cache's startIndex, endIndex and all elements.
if store.startIndex != test.expectStartIndex {
t.Errorf("expect startIndex %d, but get %d", test.expectStartIndex, store.startIndex)
}
if store.endIndex != test.startIndex+test.eventCount {
t.Errorf("expect endIndex %d get %d", test.startIndex+test.eventCount, store.endIndex)
}
if !checkCacheElements(store) {
t.Errorf("some elements locations in cache is wrong")
}
})
}
}
func loadEventWithDuration(cache *watchCache, count int, interval time.Duration) {
for i := 0; i < count; i++ {
event := &watchCacheEvent{
Key: fmt.Sprintf("event-%d", i+cache.startIndex),
RecordTime: cache.clock.Now().Add(time.Duration(interval.Nanoseconds() * int64(i))),
}
cache.cache[(i+cache.startIndex)%cache.capacity] = event
}
cache.endIndex = cache.startIndex + count
}
func checkCacheElements(cache *watchCache) bool {
for i := cache.startIndex; i < cache.endIndex; i++ {
location := i % cache.capacity
if cache.cache[location].Key != fmt.Sprintf("event-%d", i) {
return false
}
}
return true
}
func BenchmarkWatchCache_updateCache(b *testing.B) {
store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{})
store.cache = store.cache[:0]
store.upperBoundCapacity = defaultUpperBoundCapacity
loadEventWithDuration(store, defaultUpperBoundCapacity, 0)
add := &watchCacheEvent{
Key: fmt.Sprintf("event-%d", defaultUpperBoundCapacity),
RecordTime: store.clock.Now(),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
store.updateCache(add)
}
}

View File

@ -443,9 +443,8 @@ func TestWatch(t *testing.T) {
t.Fatalf("Expected no direct error, got %v", err)
}
defer tooOldWatcher.Stop()
// Ensure we get a "Gone" error
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
// Events happens in eventFreshDuration, cache expand without event "Gone".
verifyWatchEvent(t, tooOldWatcher, watch.Added, podFoo)
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
if err != nil {