Tweak use of caching objects

Kubernetes-commit: c5170dfb66749b60a4a1611dc7b4a39e64b33e2e
This commit is contained in:
wojtekt 2019-10-16 09:05:13 +02:00 committed by Kubernetes Publisher
parent ced80a6097
commit a76d249b96
4 changed files with 150 additions and 94 deletions

View File

@ -823,6 +823,37 @@ func (c *Cacher) dispatchEvents() {
}
}
func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
switch event.Type {
case watch.Added, watch.Modified:
if object, err := newCachingObject(event.Object); err == nil {
event.Object = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
// Don't wrap PrevObject for update event (for create events it is nil).
// We only encode those to deliver DELETE watch events, so if
// event.Object is not nil it can be used only for watchers for which
// selector was satisfied for its previous version and is no longer
// satisfied for the current version.
// This is rare enough that it doesn't justify making deep-copy of the
// object (done by newCachingObject) every time.
case watch.Deleted:
// Don't wrap Object for delete events - these are not to deliver any
// events. Only wrap PrevObject.
if object, err := newCachingObject(event.PrevObject); err == nil {
// Update resource version of the underlying object.
// event.PrevObject is used to deliver DELETE watch events and
// for them, we set resourceVersion to <current> instead of
// the resourceVersion of the last modification of the object.
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
event.PrevObject = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
}
}
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
c.startDispatching(event)
defer c.finishDispatching()
@ -836,6 +867,23 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
watcher.nonblockingAdd(event)
}
} else {
// Set up caching of object serializations only for dispatching this event.
//
// Storing serializations in memory would result in increased memory usage,
// but it would help for caching encodings for watches started from old
// versions. However, we still don't have a convincing data that the gain
// from it justifies increased memory usage, so for now we drop the cached
// serializations after dispatching this event.
//
// Given the deep-copies that are done to create cachingObjects,
// we try to cache serializations only if there are at least 3 watchers.
if len(c.watchersBuffer) >= 3 {
// Make a shallow copy to allow overwriting Object and PrevObject.
wcEvent := *event
setCachingObjects(&wcEvent, c.versioner)
event = &wcEvent
}
c.blockedWatchers = c.blockedWatchers[:0]
for _, watcher := range c.watchersBuffer {
if !watcher.nonblockingAdd(event) {

View File

@ -998,3 +998,104 @@ func TestCachingDeleteEvents(t *testing.T) {
verifyEvents(t, fooEventsWatcher, fooEvents)
verifyEvents(t, barEventsWatcher, barEvents)
}
func testCachingObjects(t *testing.T, watchersCount int) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
dispatchedEvents := []*watchCacheEvent{}
cacher.watchCache.eventHandler = func(event *watchCacheEvent) {
dispatchedEvents = append(dispatchedEvents, event)
cacher.processEvent(event)
}
watchers := make([]watch.Interface, 0, watchersCount)
for i := 0; i < watchersCount; i++ {
w, err := cacher.Watch(context.TODO(), "pods/ns", "1000", storage.Everything)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
defer w.Stop()
watchers = append(watchers, w)
}
makePod := func(name, rv string) *examplev1.Pod {
return &examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "ns",
ResourceVersion: rv,
},
}
}
pod1 := makePod("pod", "1001")
pod2 := makePod("pod", "1002")
pod3 := makePod("pod", "1003")
cacher.watchCache.Add(pod1)
cacher.watchCache.Update(pod2)
cacher.watchCache.Delete(pod3)
// At this point, we already have dispatchedEvents fully propagated.
verifyEvents := func(w watch.Interface) {
var event watch.Event
for index := range dispatchedEvents {
select {
case event = <-w.ResultChan():
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout watiching for the event")
}
var object runtime.Object
if watchersCount >= 3 {
if _, ok := event.Object.(runtime.CacheableObject); !ok {
t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object)
}
object = event.Object.(runtime.CacheableObject).GetObject()
} else {
if _, ok := event.Object.(runtime.CacheableObject); ok {
t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object)
}
object = event.Object.DeepCopyObject()
}
if event.Type == watch.Deleted {
resourceVersion, err := cacher.versioner.ObjectResourceVersion(cacher.watchCache.cache[index].PrevObject)
if err != nil {
t.Fatalf("Failed to parse resource version: %v", err)
}
updateResourceVersionIfNeeded(object, cacher.versioner, resourceVersion)
}
var e runtime.Object
switch event.Type {
case watch.Added, watch.Modified:
e = cacher.watchCache.cache[index].Object
case watch.Deleted:
e = cacher.watchCache.cache[index].PrevObject
default:
t.Errorf("unexpected watch event: %#v", event)
}
if a := object; !reflect.DeepEqual(a, e) {
t.Errorf("event object messed up for %s: %#v, expected: %#v", event.Type, a, e)
}
}
}
for i := range watchers {
verifyEvents(watchers[i])
}
}
func TestCachingObjects(t *testing.T) {
t.Run("single watcher", func(t *testing.T) { testCachingObjects(t, 1) })
t.Run("many watcher", func(t *testing.T) { testCachingObjects(t, 3) })
}

View File

@ -210,37 +210,6 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob
return object, resourceVersion, nil
}
func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
switch event.Type {
case watch.Added, watch.Modified:
if object, err := newCachingObject(event.Object); err == nil {
event.Object = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
// Don't wrap PrevObject for update event (for create events it is nil).
// We only encode those to deliver DELETE watch events, so if
// event.Object is not nil it can be used only for watchers for which
// selector was satisfied for its previous version and is no longer
// satisfied for the current version.
// This is rare enough that it doesn't justify making deep-copy of the
// object (done by newCachingObject) every time.
case watch.Deleted:
// Don't wrap Object for delete events - these are not to deliver any
// events. Only wrap PrevObject.
if object, err := newCachingObject(event.PrevObject); err == nil {
// Update resource version of the underlying object.
// event.PrevObject is used to deliver DELETE watch events and
// for them, we set resourceVersion to <current> instead of
// the resourceVersion of the last modification of the object.
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
event.PrevObject = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
}
}
// processEvent is safe as long as there is at most one call to it in flight
// at any point in time.
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
@ -295,18 +264,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
// This is safe as long as there is at most one call to processEvent in flight
// at any point in time.
if w.eventHandler != nil {
// Set up caching of object serializations only for dispatching this event.
//
// Storing serializations in memory would result in increased memory usage,
// but it would help for caching encodings for watches started from old
// versions. However, we still don't have a convincing data that the gain
// from it justifies increased memory usage, so for now we drop the cached
// serializations after dispatching this event.
// Make a shallow copy to allow overwriting Object and PrevObject.
wce := *wcEvent
setCachingObjects(&wce, w.versioner)
w.eventHandler(&wce)
w.eventHandler(wcEvent)
}
return nil
}

View File

@ -18,7 +18,6 @@ package cacher
import (
"fmt"
"reflect"
"strconv"
"strings"
"testing"
@ -436,53 +435,3 @@ func TestReflectorForWatchCache(t *testing.T) {
}
}
}
func TestCachingObjects(t *testing.T) {
store := newTestWatchCache(5)
index := 0
store.eventHandler = func(event *watchCacheEvent) {
switch event.Type {
case watch.Added, watch.Modified:
if _, ok := event.Object.(runtime.CacheableObject); !ok {
t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object)
}
if _, ok := event.PrevObject.(runtime.CacheableObject); ok {
t.Fatalf("PrevObject in %s event should not support caching: %#v", event.Type, event.Object)
}
case watch.Deleted:
if _, ok := event.Object.(runtime.CacheableObject); ok {
t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object)
}
if _, ok := event.PrevObject.(runtime.CacheableObject); !ok {
t.Fatalf("PrevObject in %s event should support caching: %#v", event.Type, event.Object)
}
}
// Verify that delivered event is the same as cached one modulo Object/PrevObject.
switch event.Type {
case watch.Added, watch.Modified:
event.Object = event.Object.(runtime.CacheableObject).GetObject()
case watch.Deleted:
event.PrevObject = event.PrevObject.(runtime.CacheableObject).GetObject()
// In events store in watchcache, we also don't update ResourceVersion.
// So we need to ensure that we don't fail on it.
resourceVersion, err := store.versioner.ObjectResourceVersion(store.cache[index].PrevObject)
if err != nil {
t.Fatalf("Failed to parse resource version: %v", err)
}
updateResourceVersionIfNeeded(event.PrevObject, store.versioner, resourceVersion)
}
if a, e := event, store.cache[index]; !reflect.DeepEqual(a, e) {
t.Errorf("watchCacheEvent messed up: %#v, expected: %#v", a, e)
}
index++
}
pod1 := makeTestPod("pod", 1)
pod2 := makeTestPod("pod", 2)
pod3 := makeTestPod("pod", 3)
store.Add(pod1)
store.Update(pod2)
store.Delete(pod3)
}