From ecf3a5737404a63f8e64e6a810eed22efe98e2e7 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Thu, 11 Aug 2022 15:45:04 -0400 Subject: [PATCH 1/2] watch cache: log GroupResource, not objectType All CustomResources are treated as *unstructured.Unstructured, leading the watch cache to log anything related to CRs as Unstructured. This change uses the schema.GroupResource instead of object type for all type related log messages in the watch cache, resulting in distinct output for each CR type. Signed-off-by: Andy Goldstein Kubernetes-commit: 397533a4c2df9639ff4422c907d06fae195a1835 --- .../generic/registry/storage_factory.go | 1 + pkg/registry/generic/registry/store_test.go | 1 + pkg/storage/cacher/cacher.go | 52 ++++++++++++++----- pkg/storage/cacher/cacher_whitebox_test.go | 16 +++--- pkg/storage/tests/cacher_test.go | 1 + 5 files changed, 50 insertions(+), 21 deletions(-) diff --git a/pkg/registry/generic/registry/storage_factory.go b/pkg/registry/generic/registry/storage_factory.go index da9a870b3..3983c92d0 100644 --- a/pkg/registry/generic/registry/storage_factory.go +++ b/pkg/registry/generic/registry/storage_factory.go @@ -56,6 +56,7 @@ func StorageWithCacher() generic.StorageDecorator { cacherConfig := cacherstorage.Config{ Storage: s, Versioner: storage.APIObjectVersioner{}, + GroupResource: storageConfig.GroupResource, ResourcePrefix: resourcePrefix, KeyFunc: keyFunc, NewFunc: newFunc, diff --git a/pkg/registry/generic/registry/store_test.go b/pkg/registry/generic/registry/store_test.go index de1fdb637..1c0b2cc6a 100644 --- a/pkg/registry/generic/registry/store_test.go +++ b/pkg/registry/generic/registry/store_test.go @@ -2318,6 +2318,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE config := cacherstorage.Config{ Storage: s, Versioner: storage.APIObjectVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, ResourcePrefix: podPrefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, GetAttrsFunc: getPodAttrs, diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index 24dd64d9e..745d51dd8 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -69,6 +70,9 @@ type Config struct { // An underlying storage.Versioner. Versioner storage.Versioner + // The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging. + GroupResource schema.GroupResource + // The Cache will be caching objects of a given Type and assumes that they // are all stored under ResourcePrefix directory in the underlying database. ResourcePrefix string @@ -146,13 +150,13 @@ func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool } } -func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cacheWatcher)) { +func (i *indexedWatchers) terminateAll(groupResource schema.GroupResource, done func(*cacheWatcher)) { // note that we don't have to call setDrainInputBufferLocked method on the watchers // because we take advantage of the default value - stop immediately // also watchers that have had already its draining strategy set // are no longer available (they were removed from the allWatchers and the valueWatchers maps) if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 { - klog.Warningf("Terminating all watchers from cacher %v", objectType) + klog.Warningf("Terminating all watchers from cacher %v", groupResource) } i.allWatchers.terminateAll(done) for _, watchers := range i.valueWatchers { @@ -258,6 +262,8 @@ type Cacher struct { // Expected type of objects in the underlying cache. objectType reflect.Type + // Used for logging, to disambiguate *unstructured.Unstructured (CRDs) + groupResource schema.GroupResource // "sliding window" of recent changes of objects and the current state. watchCache *watchCache @@ -347,6 +353,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { ready: newReady(), storage: config.Storage, objectType: objType, + groupResource: config.GroupResource, versioner: config.Versioner, newFunc: config.NewFunc, indexedTrigger: indexedTrigger, @@ -422,7 +429,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { c.watchCache.SetOnReplace(func() { successfulList = true c.ready.set(true) - klog.V(1).Infof("cacher (%v): initialized", c.objectType.String()) + klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String()) metrics.WatchCacheInitializations.WithLabelValues(c.objectType.String()).Inc() }) defer func() { @@ -437,7 +444,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { // Also note that startCaching is called in a loop, so there's no need // to have another loop here. if err := c.reflector.ListAndWatch(stopChannel); err != nil { - klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.objectType.String(), err) + klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err) } } @@ -508,7 +515,16 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // given that memory allocation may trigger GC and block the thread. // Also note that emptyFunc is a placeholder, until we will be able // to compute watcher.forget function (which has to happen under lock). - watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType, identifier) + watcher := newCacheWatcher( + chanSize, + filterWithAttrsFunction(key, pred), + emptyFunc, + c.versioner, + deadline, + pred.AllowWatchBookmarks, + c.groupResource, + identifier, + ) // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked @@ -654,7 +670,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio trace := utiltrace.New("cacher list", utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)}, - utiltrace.Field{Key: "type", Value: c.objectType.String()}) + utiltrace.Field{Key: "type", Value: c.groupResource.String()}) defer trace.LogIfLong(500 * time.Millisecond) if err := c.ready.wait(); err != nil { @@ -757,7 +773,7 @@ func (c *Cacher) triggerValuesThreadUnsafe(event *watchCacheEvent) ([]string, bo func (c *Cacher) processEvent(event *watchCacheEvent) { if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { // Monitor if this gets backed up, and how much. - klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen) + klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.groupResource.String(), curLen) } c.incoming <- *event } @@ -1008,7 +1024,7 @@ func (c *Cacher) finishDispatching() { func (c *Cacher) terminateAllWatchers() { c.Lock() defer c.Unlock() - c.watchers.terminateAll(c.objectType, c.stopWatcherLocked) + c.watchers.terminateAll(c.groupResource, c.stopWatcherLocked) } func (c *Cacher) stopWatcherLocked(watcher *cacheWatcher) { @@ -1176,8 +1192,7 @@ type cacheWatcher struct { // save it here to send bookmark events before that. deadline time.Time allowWatchBookmarks bool - // Object type of the cache watcher interests - objectType reflect.Type + groupResource schema.GroupResource // human readable identifier that helps assigning cacheWatcher // instance with request @@ -1188,7 +1203,16 @@ type cacheWatcher struct { drainInputBuffer bool } -func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(bool), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type, identifier string) *cacheWatcher { +func newCacheWatcher( + chanSize int, + filter filterWithAttrsFunc, + forget func(bool), + versioner storage.Versioner, + deadline time.Time, + allowWatchBookmarks bool, + groupResource schema.GroupResource, + identifier string, +) *cacheWatcher { return &cacheWatcher{ input: make(chan *watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), @@ -1199,7 +1223,7 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(bool) versioner: versioner, deadline: deadline, allowWatchBookmarks: allowWatchBookmarks, - objectType: objectType, + groupResource: groupResource, identifier: identifier, } } @@ -1256,7 +1280,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { // This means that we couldn't send event to that watcher. // Since we don't want to block on it infinitely, // we simply terminate it. - klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.objectType.String(), c.identifier, len(c.input), len(c.result)) + klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result)) metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc() c.forget(false) } @@ -1461,7 +1485,7 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { - klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, objType, c.identifier, processingTime) + klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime) } c.process(ctx, resourceVersion) diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 883571e37..3450baebf 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/diff" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -74,7 +75,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { @@ -194,7 +195,7 @@ TestCase: testCase.events[j].ResourceVersion = uint64(j) + 1 } - w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") + w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0) ch := w.ResultChan() @@ -266,6 +267,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) { config := Config{ Storage: s, Versioner: testVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, GetAttrsFunc: storage.DefaultNamespaceScopedAttr, @@ -537,7 +539,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { // timeout to zero and run the Stop goroutine concurrently. // May sure that the watch will not be blocked on Stop. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") go w.Stop() select { case <-done: @@ -549,7 +551,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { deadline := time.Now().Add(time.Hour) // After that, verifies the cacheWatcher.process goroutine works correctly. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "") + w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "") w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() @@ -674,7 +676,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) { forget := func(bool) {} newWatcher := func(deadline time.Time) *cacheWatcher { - return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType, "") + return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") } clock := testingclock.NewFakeClock(time.Now()) @@ -1616,7 +1618,7 @@ func TestCacheWatcherDraining(t *testing.T) { makeWatchCacheEvent(5), makeWatchCacheEvent(6), } - w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "") + w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { t.Fatal("failed adding an even to the watcher") @@ -1657,7 +1659,7 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) { makeWatchCacheEvent(5), makeWatchCacheEvent(6), } - w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "") + w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { t.Fatal("failed adding an even to the watcher") diff --git a/pkg/storage/tests/cacher_test.go b/pkg/storage/tests/cacher_test.go index 0b036c368..0966d234a 100644 --- a/pkg/storage/tests/cacher_test.go +++ b/pkg/storage/tests/cacher_test.go @@ -121,6 +121,7 @@ func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstor config := cacherstorage.Config{ Storage: s, Versioner: v, + GroupResource: schema.GroupResource{Resource: "pods"}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, GetAttrsFunc: GetAttrs, From 7eb011f596ecab3d949eee731555c2ff41bda72b Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Thu, 11 Aug 2022 15:51:21 -0400 Subject: [PATCH 2/2] watch cache: metrics: objectType -> group resource Use the group resource instead of objectType in watch cache metrics, because all CustomResources are grouped together as *unstructured.Unstructured, instead of 1 entry per type. Signed-off-by: Andy Goldstein Kubernetes-commit: d08b69e8d35a5aa73a178c508f9b0e1ad74b882d --- pkg/storage/cacher/cacher.go | 14 +++++++------- pkg/storage/cacher/cacher_whitebox_test.go | 4 ---- pkg/storage/cacher/watch_cache.go | 15 +++++++-------- pkg/storage/cacher/watch_cache_test.go | 5 ++--- 4 files changed, 16 insertions(+), 22 deletions(-) diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index 745d51dd8..ac5563ece 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -70,7 +70,8 @@ type Config struct { // An underlying storage.Versioner. Versioner storage.Versioner - // The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging. + // The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging + // and metrics. GroupResource schema.GroupResource // The Cache will be caching objects of a given Type and assumes that they @@ -384,7 +385,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } watchCache := newWatchCache( - config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType) + config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource) listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix @@ -430,7 +431,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { successfulList = true c.ready.set(true) klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String()) - metrics.WatchCacheInitializations.WithLabelValues(c.objectType.String()).Inc() + metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc() }) defer func() { if successfulList { @@ -804,7 +805,7 @@ func (c *Cacher) dispatchEvents() { c.dispatchEvent(&event) } lastProcessedResourceVersion = event.ResourceVersion - metrics.EventsCounter.WithLabelValues(c.objectType.String()).Inc() + metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc() case <-bookmarkTimer.C(): bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25)) // Never send a bookmark event if we did not see an event here, this is fine @@ -1281,7 +1282,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { // Since we don't want to block on it infinitely, // we simply terminate it. klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result)) - metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc() + metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc() c.forget(false) } @@ -1479,9 +1480,8 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch initEventCount++ } - objType := c.objectType.String() if initEventCount > 0 { - metrics.InitCounter.WithLabelValues(objType).Add(float64(initEventCount)) + metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount)) } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 3450baebf..c15ff5e86 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -48,10 +48,6 @@ import ( testingclock "k8s.io/utils/clock/testing" ) -var ( - objectType = reflect.TypeOf(&v1.Pod{}) -) - // verifies the cacheWatcher.process goroutine is properly cleaned up even if // the writes to cacheWatcher.result channel is blocked. func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { diff --git a/pkg/storage/cacher/watch_cache.go b/pkg/storage/cacher/watch_cache.go index 9deff4070..2bd4284e5 100644 --- a/pkg/storage/cacher/watch_cache.go +++ b/pkg/storage/cacher/watch_cache.go @@ -19,7 +19,6 @@ package cacher import ( "fmt" "math" - "reflect" "sort" "sync" "time" @@ -28,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/cacher/metrics" @@ -189,8 +189,8 @@ type watchCache struct { // An underlying storage.Versioner. versioner storage.Versioner - // cacher's objectType. - objectType reflect.Type + // cacher's group resource + groupResource schema.GroupResource // For testing cache interval invalidation. indexValidator indexValidator @@ -203,7 +203,7 @@ func newWatchCache( versioner storage.Versioner, indexers *cache.Indexers, clock clock.Clock, - objectType reflect.Type) *watchCache { + groupResource schema.GroupResource) *watchCache { wc := &watchCache{ capacity: defaultLowerBoundCapacity, keyFunc: keyFunc, @@ -219,10 +219,9 @@ func newWatchCache( eventHandler: eventHandler, clock: clock, versioner: versioner, - objectType: objectType, + groupResource: groupResource, } - objType := objectType.String() - metrics.WatchCacheCapacity.WithLabelValues(objType).Set(float64(wc.capacity)) + metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity)) wc.cond = sync.NewCond(wc.RLocker()) wc.indexValidator = wc.isIndexValidLocked @@ -387,7 +386,7 @@ func (w *watchCache) doCacheResizeLocked(capacity int) { newCache[i%capacity] = w.cache[i%w.capacity] } w.cache = newCache - metrics.RecordsWatchCacheCapacityChange(w.objectType.String(), w.capacity, capacity) + metrics.RecordsWatchCacheCapacityChange(w.groupResource.String(), w.capacity, capacity) w.capacity = capacity } diff --git a/pkg/storage/cacher/watch_cache_test.go b/pkg/storage/cacher/watch_cache_test.go index df6f9c008..6200fc6e5 100644 --- a/pkg/storage/cacher/watch_cache_test.go +++ b/pkg/storage/cacher/watch_cache_test.go @@ -18,7 +18,6 @@ package cacher import ( "fmt" - "reflect" "strconv" "strings" "testing" @@ -31,9 +30,9 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/storage" "k8s.io/client-go/tools/cache" testingclock "k8s.io/utils/clock/testing" @@ -112,7 +111,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { } versioner := storage.APIObjectVersioner{} mockHandler := func(*watchCacheEvent) {} - wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{})) + wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}) // To preserve behavior of tests that assume a given capacity, // resize it to th expected size. wc.capacity = capacity