From 4aa6bc019e52f58aa8d1a57bf8b89c3af2ad8d51 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Wed, 14 Aug 2019 16:23:03 +0200 Subject: [PATCH 1/5] Implement Encoder.Identifier() method Kubernetes-commit: cd4215ad8b95773a793f69fc2e8bf93c7ede97aa --- pkg/endpoints/discovery/util.go | 32 ++++++++++++++++++- .../handlers/responsewriters/writers_test.go | 4 +++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/pkg/endpoints/discovery/util.go b/pkg/endpoints/discovery/util.go index 81a13d64a..fa63b19d2 100644 --- a/pkg/endpoints/discovery/util.go +++ b/pkg/endpoints/discovery/util.go @@ -18,10 +18,12 @@ package discovery import ( "bytes" + "encoding/json" "fmt" "io" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog" ) const APIGroupPrefix = "/apis" @@ -36,6 +38,29 @@ func keepUnversioned(group string) bool { type stripVersionEncoder struct { encoder runtime.Encoder serializer runtime.Serializer + identifier runtime.Identifier +} + +func newStripVersionEncoder(e runtime.Encoder, s runtime.Serializer) runtime.Encoder { + return stripVersionEncoder{ + encoder: e, + serializer: s, + identifier: identifier(e), + } +} + +func identifier(e runtime.Encoder) runtime.Identifier { + result := map[string]string{ + "name": "stripVersion", + } + if e != nil { + result["encoder"] = string(e.Identifier()) + } + identifier, err := json.Marshal(result) + if err != nil { + klog.Fatalf("Failed marshaling identifier for stripVersionEncoder: %v", err) + } + return runtime.Identifier(identifier) } func (c stripVersionEncoder) Encode(obj runtime.Object, w io.Writer) error { @@ -54,6 +79,11 @@ func (c stripVersionEncoder) Encode(obj runtime.Object, w io.Writer) error { return c.serializer.Encode(roundTrippedObj, w) } +// Identifier implements runtime.Encoder interface. +func (c stripVersionEncoder) Identifier() runtime.Identifier { + return c.identifier +} + // stripVersionNegotiatedSerializer will return stripVersionEncoder when // EncoderForVersion is called. See comments for stripVersionEncoder. type stripVersionNegotiatedSerializer struct { @@ -69,5 +99,5 @@ func (n stripVersionNegotiatedSerializer) EncoderForVersion(encoder runtime.Enco panic(fmt.Sprintf("Unable to extract serializer from %#v", encoder)) } versioned := n.NegotiatedSerializer.EncoderForVersion(encoder, gv) - return stripVersionEncoder{versioned, serializer} + return newStripVersionEncoder(versioned, serializer) } diff --git a/pkg/endpoints/handlers/responsewriters/writers_test.go b/pkg/endpoints/handlers/responsewriters/writers_test.go index 31bea58c0..624b6becd 100644 --- a/pkg/endpoints/handlers/responsewriters/writers_test.go +++ b/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -287,6 +287,10 @@ func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error { return err } +func (e *fakeEncoder) Identifier() runtime.Identifier { + return runtime.Identifier("fake") +} + func gzipContent(data []byte, level int) []byte { buf := &bytes.Buffer{} gw, err := gzip.NewWriterLevel(buf, level) From f186d58ea0800e0b6a7dd84f9e00a096059971fb Mon Sep 17 00:00:00 2001 From: wojtekt Date: Mon, 19 Aug 2019 09:55:49 +0200 Subject: [PATCH 2/5] Fix transformObject to work with CacheableObject. Kubernetes-commit: 1dd43724ce97b610aae7f9e0f3842f0798ec9d7b --- pkg/endpoints/handlers/response.go | 18 +++ pkg/endpoints/handlers/response_test.go | 173 ++++++++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 pkg/endpoints/handlers/response_test.go diff --git a/pkg/endpoints/handlers/response.go b/pkg/endpoints/handlers/response.go index c84d8634b..d2782178f 100644 --- a/pkg/endpoints/handlers/response.go +++ b/pkg/endpoints/handlers/response.go @@ -38,6 +38,24 @@ import ( // the client's desired form, as well as ensuring any API level fields like self-link // are properly set. func transformObject(ctx context.Context, obj runtime.Object, opts interface{}, mediaType negotiation.MediaTypeOptions, scope *RequestScope, req *http.Request) (runtime.Object, error) { + if co, ok := obj.(runtime.CacheableObject); ok { + if mediaType.Convert != nil { + // Non-nil mediaType.Convert means that some conversion of the object + // has to happen. Currently conversion may potentially modify the + // object or assume something about it (e.g. asTable operates on + // reflection, which won't work for any wrapper). + // To ensure it will work correctly, let's operate on base objects + // and not cache it for now. + // + // TODO: Long-term, transformObject should be changed so that it + // implements runtime.Encoder interface. + return doTransformObject(ctx, co.GetObject(), opts, mediaType, scope, req) + } + } + return doTransformObject(ctx, obj, opts, mediaType, scope, req) +} + +func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}, mediaType negotiation.MediaTypeOptions, scope *RequestScope, req *http.Request) (runtime.Object, error) { if _, ok := obj.(*metav1.Status); ok { return obj, nil } diff --git a/pkg/endpoints/handlers/response_test.go b/pkg/endpoints/handlers/response_test.go new file mode 100644 index 000000000..d47a81e0c --- /dev/null +++ b/pkg/endpoints/handlers/response_test.go @@ -0,0 +1,173 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handlers + +import ( + "context" + "fmt" + "io" + "net/http" + "reflect" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + examplev1 "k8s.io/apiserver/pkg/apis/example/v1" + "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" +) + +var _ runtime.CacheableObject = &mockCacheableObject{} + +type mockCacheableObject struct { + gvk schema.GroupVersionKind + obj runtime.Object +} + +// DeepCopyObject implements runtime.Object interface. +func (m *mockCacheableObject) DeepCopyObject() runtime.Object { + panic("DeepCopy unimplemented for mockCacheableObject") +} + +// GetObjectKind implements runtime.Object interface. +func (m *mockCacheableObject) GetObjectKind() schema.ObjectKind { + return m +} + +// GroupVersionKind implements schema.ObjectKind interface. +func (m *mockCacheableObject) GroupVersionKind() schema.GroupVersionKind { + return m.gvk +} + +// SetGroupVersionKind implements schema.ObjectKind interface. +func (m *mockCacheableObject) SetGroupVersionKind(gvk schema.GroupVersionKind) { + m.gvk = gvk +} + +// CacheEncode implements runtime.CacheableObject interface. +func (m *mockCacheableObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error { + return fmt.Errorf("unimplemented") +} + +// GetObject implements runtime.CacheableObject interface. +func (m *mockCacheableObject) GetObject() runtime.Object { + return m.obj +} + +type mockNamer struct{} + +func (*mockNamer) Namespace(_ *http.Request) (string, error) { return "", nil } +func (*mockNamer) Name(_ *http.Request) (string, string, error) { return "", "", nil } +func (*mockNamer) ObjectName(_ runtime.Object) (string, string, error) { return "", "", nil } +func (*mockNamer) SetSelfLink(_ runtime.Object, _ string) error { return nil } +func (*mockNamer) GenerateLink(_ *request.RequestInfo, _ runtime.Object) (string, error) { + return "", nil +} +func (*mockNamer) GenerateListLink(_ *http.Request) (string, error) { return "", nil } + +func TestCacheableObject(t *testing.T) { + pomGVK := metav1.SchemeGroupVersion.WithKind("PartialObjectMetadata") + tableGVK := metav1.SchemeGroupVersion.WithKind("Table") + + status := &metav1.Status{Status: "status"} + pod := &examplev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + } + podMeta := &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + } + podMeta.GetObjectKind().SetGroupVersionKind(pomGVK) + podTable := &metav1.Table{ + Rows: []metav1.TableRow{ + { + Cells: []interface{}{pod.Name, pod.CreationTimestamp.Time.UTC().Format(time.RFC3339)}, + }, + }, + } + + tableConvertor := rest.NewDefaultTableConvertor(examplev1.Resource("Pod")) + + testCases := []struct { + desc string + object runtime.Object + opts *metav1beta1.TableOptions + mediaType negotiation.MediaTypeOptions + + expectedUnwrap bool + expectedObj runtime.Object + expectedErr error + }{ + { + desc: "metav1.Status", + object: status, + expectedObj: status, + expectedErr: nil, + }, + { + desc: "cacheableObject nil convert", + object: &mockCacheableObject{obj: pod}, + mediaType: negotiation.MediaTypeOptions{}, + expectedObj: &mockCacheableObject{obj: pod}, + expectedErr: nil, + }, + { + desc: "cacheableObject as PartialObjectMeta", + object: &mockCacheableObject{obj: pod}, + mediaType: negotiation.MediaTypeOptions{Convert: &pomGVK}, + expectedObj: podMeta, + expectedErr: nil, + }, + { + desc: "cacheableObject as Table", + object: &mockCacheableObject{obj: pod}, + opts: &metav1beta1.TableOptions{NoHeaders: true, IncludeObject: metav1.IncludeNone}, + mediaType: negotiation.MediaTypeOptions{Convert: &tableGVK}, + expectedObj: podTable, + expectedErr: nil, + }, + } + + for _, test := range testCases { + t.Run(test.desc, func(t *testing.T) { + result, err := transformObject( + request.WithRequestInfo(context.TODO(), &request.RequestInfo{}), + test.object, test.opts, test.mediaType, + &RequestScope{ + Namer: &mockNamer{}, + TableConvertor: tableConvertor, + }, + nil) + + if err != test.expectedErr { + t.Errorf("unexpected error: %v, expected: %v", err, test.expectedErr) + } + if a, e := result, test.expectedObj; !reflect.DeepEqual(a, e) { + t.Errorf("unexpected result: %v, expected: %v", a, e) + } + }) + } +} From 94a2664cd38b0f2a5c283599439dae9acb8cb4bd Mon Sep 17 00:00:00 2001 From: wojtekt Date: Thu, 15 Aug 2019 22:02:33 +0200 Subject: [PATCH 3/5] Implement support for CacheableObject Kubernetes-commit: 970f103e2c079da98743db35e38fd411a64e2e04 --- pkg/endpoints/discovery/util.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/endpoints/discovery/util.go b/pkg/endpoints/discovery/util.go index fa63b19d2..2411a780d 100644 --- a/pkg/endpoints/discovery/util.go +++ b/pkg/endpoints/discovery/util.go @@ -64,6 +64,13 @@ func identifier(e runtime.Encoder) runtime.Identifier { } func (c stripVersionEncoder) Encode(obj runtime.Object, w io.Writer) error { + if co, ok := obj.(runtime.CacheableObject); ok { + return co.CacheEncode(c.Identifier(), c.doEncode, w) + } + return c.doEncode(obj, w) +} + +func (c stripVersionEncoder) doEncode(obj runtime.Object, w io.Writer) error { buf := bytes.NewBuffer([]byte{}) err := c.encoder.Encode(obj, buf) if err != nil { From 4b6162fe6b29e47338ca549d3db1d5b8688c348c Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Sun, 25 Aug 2019 06:16:49 +0200 Subject: [PATCH 4/5] CachingObject Kubernetes-commit: 4cd81549f9a6a325183e6c43c831ecf79f434b8e --- pkg/storage/cacher/cacher_whitebox_test.go | 2 +- pkg/storage/cacher/caching_object.go | 397 +++++++++++++++++++++ pkg/storage/cacher/caching_object_test.go | 160 +++++++++ 3 files changed, 558 insertions(+), 1 deletion(-) create mode 100644 pkg/storage/cacher/caching_object.go create mode 100644 pkg/storage/cacher/caching_object_test.go diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 8663e8ccc..845b62381 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -703,7 +703,7 @@ func testCacherSendBookmarkEvents(t *testing.T, watchCacheEnabled, allowWatchBoo } rv, err := cacher.versioner.ObjectResourceVersion(event.Object) if err != nil { - t.Errorf("failed to parse resource version from %#v", event.Object) + t.Errorf("failed to parse resource version from %#v: %v", event.Object, err) } if event.Type == watch.Bookmark { if !expectedBookmarks { diff --git a/pkg/storage/cacher/caching_object.go b/pkg/storage/cacher/caching_object.go new file mode 100644 index 000000000..9e7a46393 --- /dev/null +++ b/pkg/storage/cacher/caching_object.go @@ -0,0 +1,397 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "bytes" + "fmt" + "io" + "reflect" + "runtime/debug" + "sync" + "sync/atomic" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog" +) + +var _ runtime.CacheableObject = &cachingObject{} + +// metaRuntimeInterface implements runtime.Object and +// metav1.Object interfaces. +type metaRuntimeInterface interface { + runtime.Object + metav1.Object +} + +// serializationResult captures a result of serialization. +type serializationResult struct { + // once should be used to ensure serialization is computed once. + once sync.Once + + // raw is serialized object. + raw []byte + // err is error from serialization. + err error +} + +// serializationsCache is a type for caching serialization results. +type serializationsCache map[runtime.Identifier]*serializationResult + +// cachingObject is an object that is able to cache its serializations +// so that each of those is computed exactly once. +// +// cachingObject implements the metav1.Object interface (accessors for +// all metadata fields). However, setters for all fields except from +// SelfLink (which is set lately in the path) are ignored. +type cachingObject struct { + lock sync.RWMutex + + // Object for which serializations are cached. + object metaRuntimeInterface + + // serializations is a cache containing object`s serializations. + // The value stored in atomic.Value is of type serializationsCache. + // The atomic.Value type is used to allow fast-path. + serializations atomic.Value +} + +// newCachingObject performs a deep copy of the given object and wraps it +// into a cachingObject. +// An error is returned if it's not possible to cast the object to +// metav1.Object type. +func newCachingObject(object runtime.Object) (*cachingObject, error) { + if obj, ok := object.(metaRuntimeInterface); ok { + result := &cachingObject{object: obj.DeepCopyObject().(metaRuntimeInterface)} + result.serializations.Store(make(serializationsCache)) + return result, nil + } + return nil, fmt.Errorf("can't cast object to metav1.Object: %#v", object) +} + +func (o *cachingObject) getSerializationResult(id runtime.Identifier) *serializationResult { + // Fast-path for getting from cache. + serializations := o.serializations.Load().(serializationsCache) + if result, exists := serializations[id]; exists { + return result + } + + // Slow-path (that may require insert). + o.lock.Lock() + defer o.lock.Unlock() + + serializations = o.serializations.Load().(serializationsCache) + // Check if in the meantime it wasn't inserted. + if result, exists := serializations[id]; exists { + return result + } + + // Insert an entry for . This requires copy of existing map. + newSerializations := make(serializationsCache) + for k, v := range serializations { + newSerializations[k] = v + } + result := &serializationResult{} + newSerializations[id] = result + o.serializations.Store(newSerializations) + return result +} + +// CacheEncode implements runtime.CacheableObject interface. +// It serializes the object and writes the result to given io.Writer trying +// to first use the already cached result and falls back to a given encode +// function in case of cache miss. +// It assumes that for a given identifier, the encode function always encodes +// each input object into the same output format. +func (o *cachingObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error { + result := o.getSerializationResult(id) + result.once.Do(func() { + buffer := bytes.NewBuffer(nil) + result.err = encode(o.GetObject(), buffer) + result.raw = buffer.Bytes() + }) + // Once invoked, fields of serialization will not change. + if result.err != nil { + return result.err + } + _, err := w.Write(result.raw) + return err +} + +// GetObject implements runtime.CacheableObject interface. +// It returns deep-copy of the wrapped object to return ownership of it +// to the called according to the contract of the interface. +func (o *cachingObject) GetObject() runtime.Object { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.DeepCopyObject().(metaRuntimeInterface) +} + +// GetObjectKind implements runtime.Object interface. +func (o *cachingObject) GetObjectKind() schema.ObjectKind { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetObjectKind() +} + +// DeepCopyObject implements runtime.Object interface. +func (o *cachingObject) DeepCopyObject() runtime.Object { + // DeepCopyObject on cachingObject is not expected to be called anywhere. + // However, to be on the safe-side, we implement it, though given the + // cache is only an optimization we ignore copying it. + result := &cachingObject{} + result.serializations.Store(make(serializationsCache)) + + o.lock.RLock() + defer o.lock.RUnlock() + result.object = o.object.DeepCopyObject().(metaRuntimeInterface) + return result +} + +var ( + invalidationCacheTimestampLock sync.Mutex + invalidationCacheTimestamp time.Time +) + +// shouldLogCacheInvalidation allows for logging cache-invalidation +// at most once per second (to avoid spamming logs in case of issues). +func shouldLogCacheInvalidation(now time.Time) bool { + invalidationCacheTimestampLock.Lock() + defer invalidationCacheTimestampLock.Unlock() + if invalidationCacheTimestamp.Add(time.Second).Before(now) { + invalidationCacheTimestamp = now + return true + } + return false +} + +func (o *cachingObject) invalidateCacheLocked() { + if cache, ok := o.serializations.Load().(serializationsCache); ok && len(cache) == 0 { + return + } + // We don't expect cache invalidation to happen - so we want + // to log the stacktrace to allow debugging if that will happen. + // OTOH, we don't want to spam logs with it. + // So we try to log it at most once per second. + if shouldLogCacheInvalidation(time.Now()) { + klog.Warningf("Unexpected cache invalidation for %#v\n%s", o.object, string(debug.Stack())) + } + o.serializations.Store(make(serializationsCache)) +} + +// The following functions implement metav1.Object interface: +// - getters simply delegate for the underlying object +// - setters check if operations isn't noop and if so, +// invalidate the cache and delegate for the underlying object + +func (o *cachingObject) conditionalSet(isNoop func() bool, set func()) { + if fastPath := func() bool { + o.lock.RLock() + defer o.lock.RUnlock() + return isNoop() + }(); fastPath { + return + } + o.lock.Lock() + defer o.lock.Unlock() + if isNoop() { + return + } + o.invalidateCacheLocked() + set() +} + +func (o *cachingObject) GetNamespace() string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetNamespace() +} +func (o *cachingObject) SetNamespace(namespace string) { + o.conditionalSet( + func() bool { return o.object.GetNamespace() == namespace }, + func() { o.object.SetNamespace(namespace) }, + ) +} +func (o *cachingObject) GetName() string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetName() +} +func (o *cachingObject) SetName(name string) { + o.conditionalSet( + func() bool { return o.object.GetName() == name }, + func() { o.object.SetName(name) }, + ) +} +func (o *cachingObject) GetGenerateName() string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetGenerateName() +} +func (o *cachingObject) SetGenerateName(name string) { + o.conditionalSet( + func() bool { return o.object.GetGenerateName() == name }, + func() { o.object.SetGenerateName(name) }, + ) +} +func (o *cachingObject) GetUID() types.UID { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetUID() +} +func (o *cachingObject) SetUID(uid types.UID) { + o.conditionalSet( + func() bool { return o.object.GetUID() == uid }, + func() { o.object.SetUID(uid) }, + ) +} +func (o *cachingObject) GetResourceVersion() string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetResourceVersion() +} +func (o *cachingObject) SetResourceVersion(version string) { + o.conditionalSet( + func() bool { return o.object.GetResourceVersion() == version }, + func() { o.object.SetResourceVersion(version) }, + ) +} +func (o *cachingObject) GetGeneration() int64 { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetGeneration() +} +func (o *cachingObject) SetGeneration(generation int64) { + o.conditionalSet( + func() bool { return o.object.GetGeneration() == generation }, + func() { o.object.SetGeneration(generation) }, + ) +} +func (o *cachingObject) GetSelfLink() string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetSelfLink() +} +func (o *cachingObject) SetSelfLink(selfLink string) { + o.conditionalSet( + func() bool { return o.object.GetSelfLink() == selfLink }, + func() { o.object.SetSelfLink(selfLink) }, + ) +} +func (o *cachingObject) GetCreationTimestamp() metav1.Time { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetCreationTimestamp() +} +func (o *cachingObject) SetCreationTimestamp(timestamp metav1.Time) { + o.conditionalSet( + func() bool { return o.object.GetCreationTimestamp() == timestamp }, + func() { o.object.SetCreationTimestamp(timestamp) }, + ) +} +func (o *cachingObject) GetDeletionTimestamp() *metav1.Time { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetDeletionTimestamp() +} +func (o *cachingObject) SetDeletionTimestamp(timestamp *metav1.Time) { + o.conditionalSet( + func() bool { return o.object.GetDeletionTimestamp() == timestamp }, + func() { o.object.SetDeletionTimestamp(timestamp) }, + ) +} +func (o *cachingObject) GetDeletionGracePeriodSeconds() *int64 { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetDeletionGracePeriodSeconds() +} +func (o *cachingObject) SetDeletionGracePeriodSeconds(gracePeriodSeconds *int64) { + o.conditionalSet( + func() bool { return o.object.GetDeletionGracePeriodSeconds() == gracePeriodSeconds }, + func() { o.object.SetDeletionGracePeriodSeconds(gracePeriodSeconds) }, + ) +} +func (o *cachingObject) GetLabels() map[string]string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetLabels() +} +func (o *cachingObject) SetLabels(labels map[string]string) { + o.conditionalSet( + func() bool { return reflect.DeepEqual(o.object.GetLabels(), labels) }, + func() { o.object.SetLabels(labels) }, + ) +} +func (o *cachingObject) GetAnnotations() map[string]string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetAnnotations() +} +func (o *cachingObject) SetAnnotations(annotations map[string]string) { + o.conditionalSet( + func() bool { return reflect.DeepEqual(o.object.GetAnnotations(), annotations) }, + func() { o.object.SetAnnotations(annotations) }, + ) +} +func (o *cachingObject) GetFinalizers() []string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetFinalizers() +} +func (o *cachingObject) SetFinalizers(finalizers []string) { + o.conditionalSet( + func() bool { return reflect.DeepEqual(o.object.GetFinalizers(), finalizers) }, + func() { o.object.SetFinalizers(finalizers) }, + ) +} +func (o *cachingObject) GetOwnerReferences() []metav1.OwnerReference { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetOwnerReferences() +} +func (o *cachingObject) SetOwnerReferences(references []metav1.OwnerReference) { + o.conditionalSet( + func() bool { return reflect.DeepEqual(o.object.GetOwnerReferences(), references) }, + func() { o.object.SetOwnerReferences(references) }, + ) +} +func (o *cachingObject) GetClusterName() string { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetClusterName() +} +func (o *cachingObject) SetClusterName(clusterName string) { + o.conditionalSet( + func() bool { return o.object.GetClusterName() == clusterName }, + func() { o.object.SetClusterName(clusterName) }, + ) +} +func (o *cachingObject) GetManagedFields() []metav1.ManagedFieldsEntry { + o.lock.RLock() + defer o.lock.RUnlock() + return o.object.GetManagedFields() +} +func (o *cachingObject) SetManagedFields(managedFields []metav1.ManagedFieldsEntry) { + o.conditionalSet( + func() bool { return reflect.DeepEqual(o.object.GetManagedFields(), managedFields) }, + func() { o.object.SetManagedFields(managedFields) }, + ) +} diff --git a/pkg/storage/cacher/caching_object_test.go b/pkg/storage/cacher/caching_object_test.go new file mode 100644 index 000000000..ca8a50946 --- /dev/null +++ b/pkg/storage/cacher/caching_object_test.go @@ -0,0 +1,160 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "bytes" + "fmt" + "io" + "sync" + "sync/atomic" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" +) + +type mockEncoder struct { + identifier runtime.Identifier + expectedResult string + expectedError error + + callsNumber int32 +} + +func newMockEncoder(id, result string, err error) *mockEncoder { + return &mockEncoder{ + identifier: runtime.Identifier(id), + expectedResult: result, + expectedError: err, + } +} + +func (e *mockEncoder) encode(_ runtime.Object, w io.Writer) error { + atomic.AddInt32(&e.callsNumber, 1) + if e.expectedError != nil { + return e.expectedError + } + _, err := w.Write([]byte(e.expectedResult)) + return err +} + +func TestCachingObject(t *testing.T) { + object, err := newCachingObject(&v1.Pod{}) + if err != nil { + t.Fatalf("couldn't create cachingObject: %v", err) + } + + encoders := []*mockEncoder{ + newMockEncoder("1", "result1", nil), + newMockEncoder("2", "", fmt.Errorf("mock error")), + newMockEncoder("3", "result3", nil), + } + + for _, encoder := range encoders { + buffer := bytes.NewBuffer(nil) + err := object.CacheEncode(encoder.identifier, encoder.encode, buffer) + if a, e := err, encoder.expectedError; e != a { + t.Errorf("%s: unexpected error: %v, expected: %v", encoder.identifier, a, e) + } + if a, e := buffer.String(), encoder.expectedResult; e != a { + t.Errorf("%s: unexpected result: %s, expected: %s", encoder.identifier, a, e) + } + } + for _, encoder := range encoders { + if encoder.callsNumber != 1 { + t.Errorf("%s: unexpected number of encode() calls: %d", encoder.identifier, encoder.callsNumber) + } + } +} + +func TestSelfLink(t *testing.T) { + object, err := newCachingObject(&v1.Pod{}) + if err != nil { + t.Fatalf("couldn't create cachingObject: %v", err) + } + selfLink := "selfLink" + object.SetSelfLink(selfLink) + + encodeSelfLink := func(obj runtime.Object, w io.Writer) error { + accessor, err := meta.Accessor(obj) + if err != nil { + t.Fatalf("failed to get accessor for %#v: %v", obj, err) + } + _, err = w.Write([]byte(accessor.GetSelfLink())) + return err + } + buffer := bytes.NewBuffer(nil) + if err := object.CacheEncode("", encodeSelfLink, buffer); err != nil { + t.Errorf("unexpected error: %v", err) + } + if a, e := buffer.String(), selfLink; a != e { + t.Errorf("unexpected serialization: %s, expected: %s", a, e) + } + + // GetObject should also set selfLink. + buffer.Reset() + if err := encodeSelfLink(object.GetObject(), buffer); err != nil { + t.Errorf("unexpected error: %v", err) + } + if a, e := buffer.String(), selfLink; a != e { + t.Errorf("unexpected serialization: %s, expected: %s", a, e) + } +} + +func TestCachingObjectRaces(t *testing.T) { + object, err := newCachingObject(&v1.Pod{}) + if err != nil { + t.Fatalf("couldn't create cachingObject: %v", err) + } + + encoders := []*mockEncoder{} + for i := 0; i < 10; i++ { + encoder := newMockEncoder(fmt.Sprintf("%d", i), "result", nil) + encoders = append(encoders, encoder) + } + + numWorkers := 1000 + wg := &sync.WaitGroup{} + wg.Add(numWorkers) + + for i := 0; i < numWorkers; i++ { + go func() { + defer wg.Done() + object.SetSelfLink("selfLink") + buffer := bytes.NewBuffer(nil) + for _, encoder := range encoders { + buffer.Reset() + if err := object.CacheEncode(encoder.identifier, encoder.encode, buffer); err != nil { + t.Errorf("unexpected error: %v", err) + } + if callsNumber := atomic.LoadInt32(&encoder.callsNumber); callsNumber != 1 { + t.Errorf("unexpected number of serializations: %d", callsNumber) + } + } + accessor, err := meta.Accessor(object.GetObject()) + if err != nil { + t.Fatalf("failed to get accessor: %v", err) + } + if selfLink := accessor.GetSelfLink(); selfLink != "selfLink" { + t.Errorf("unexpected selfLink: %s", selfLink) + } + }() + } + wg.Wait() +} From 20ecceebd3e29b00174917868ab6056ef3d407dd Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Sun, 25 Aug 2019 09:55:52 +0200 Subject: [PATCH 5/5] Cache serializations Kubernetes-commit: 25a728ae5eb0ae067d21679fad915c0555242470 --- pkg/storage/cacher/cacher.go | 50 +++++++--- pkg/storage/cacher/cacher_whitebox_test.go | 110 ++++++++++++++++++++- pkg/storage/cacher/watch_cache.go | 54 ++++++++-- pkg/storage/cacher/watch_cache_test.go | 51 ++++++++++ pkg/storage/tests/cacher_test.go | 20 +++- 5 files changed, 262 insertions(+), 23 deletions(-) diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index 69a9b236d..696999bf9 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -754,17 +754,25 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) { return c.storage.Count(pathPrefix) } -func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) { +// baseObjectThreadUnsafe omits locking for cachingObject. +func baseObjectThreadUnsafe(object runtime.Object) runtime.Object { + if co, ok := object.(*cachingObject); ok { + return co.object + } + return object +} + +func (c *Cacher) triggerValuesThreadUnsafe(event *watchCacheEvent) ([]string, bool) { if c.indexedTrigger == nil { return nil, false } result := make([]string, 0, 2) - result = append(result, c.indexedTrigger.indexerFunc(event.Object)) + result = append(result, c.indexedTrigger.indexerFunc(baseObjectThreadUnsafe(event.Object))) if event.PrevObject == nil { return result, true } - prevTriggerValue := c.indexedTrigger.indexerFunc(event.PrevObject) + prevTriggerValue := c.indexedTrigger.indexerFunc(baseObjectThreadUnsafe(event.PrevObject)) if result[0] != prevTriggerValue { result = append(result, prevTriggerValue) } @@ -892,7 +900,10 @@ func (c *Cacher) startDispatchingBookmarkEvents() { // startDispatching chooses watchers potentially interested in a given event // a marks dispatching as true. func (c *Cacher) startDispatching(event *watchCacheEvent) { - triggerValues, supported := c.triggerValues(event) + // It is safe to call triggerValuesThreadUnsafe here, because at this + // point only this thread can access this event (we create a separate + // watchCacheEvent for every dispatch). + triggerValues, supported := c.triggerValuesThreadUnsafe(event) c.Lock() defer c.Unlock() @@ -1165,7 +1176,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { // This means that we couldn't send event to that watcher. // Since we don't want to block on it infinitely, // we simply terminate it. - klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", reflect.TypeOf(event.Object).String()) + klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String()) c.forget() } @@ -1193,6 +1204,25 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) { return c.deadline.Add(-2 * time.Second), true } +func getEventObject(object runtime.Object) runtime.Object { + if _, ok := object.(runtime.CacheableObject); ok { + // It is safe to return without deep-copy, because the underlying + // object was already deep-copied during construction. + return object + } + return object.DeepCopyObject() +} + +func updateResourceVersionIfNeeded(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) { + if _, ok := object.(*cachingObject); ok { + // We assume that for cachingObject resourceVersion was already propagated before. + return + } + if err := versioner.UpdateObject(object, resourceVersion); err != nil { + utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err)) + } +} + func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event { if event.Type == watch.Bookmark { return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()} @@ -1210,15 +1240,13 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event switch { case curObjPasses && !oldObjPasses: - return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()} + return &watch.Event{Type: watch.Added, Object: getEventObject(event.Object)} case curObjPasses && oldObjPasses: - return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()} + return &watch.Event{Type: watch.Modified, Object: getEventObject(event.Object)} case !curObjPasses && oldObjPasses: // return a delete event with the previous object content, but with the event's resource version - oldObj := event.PrevObject.DeepCopyObject() - if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil { - utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err)) - } + oldObj := getEventObject(event.PrevObject) + updateResourceVersionIfNeeded(oldObj, c.versioner, event.ResourceVersion) return &watch.Event{Type: watch.Deleted, Object: oldObj} } diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 845b62381..e73e81ce0 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -20,12 +20,14 @@ import ( "context" "fmt" "reflect" + goruntime "runtime" "strconv" "sync" "testing" "time" v1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -265,7 +267,7 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner, er Versioner: testVersioner{}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, - GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) { return nil, nil, nil }, + GetAttrsFunc: storage.DefaultNamespaceScopedAttr, NewFunc: func() runtime.Object { return &example.Pod{} }, NewListFunc: func() runtime.Object { return &example.PodList{} }, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), @@ -452,7 +454,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) { shouldContinue = false break } - rv, err := testVersioner{}.ParseResourceVersion(event.Object.(*examplev1.Pod).ResourceVersion) + rv, err := testVersioner{}.ParseResourceVersion(event.Object.(metaRuntimeInterface).GetResourceVersion()) if err != nil { t.Errorf("unexpected parsing error: %v", err) } else { @@ -906,3 +908,107 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { t.Errorf("watcher is blocked by slower one (count: %d)", eventsCount) } } + +func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event) { + _, _, line, _ := goruntime.Caller(1) + for _, expectedEvent := range events { + select { + case event := <-w.ResultChan(): + if e, a := expectedEvent.Type, event.Type; e != a { + t.Logf("(called from line %d)", line) + t.Errorf("Expected: %s, got: %s", e, a) + } + object := event.Object + if co, ok := object.(runtime.CacheableObject); ok { + object = co.GetObject() + } + if e, a := expectedEvent.Object, object; !apiequality.Semantic.DeepEqual(e, a) { + t.Logf("(called from line %d)", line) + t.Errorf("Expected: %#v, got: %#v", e, a) + } + case <-time.After(wait.ForeverTestTimeout): + t.Logf("(called from line %d)", line) + t.Errorf("Timed out waiting for an event") + } + } +} + +func TestCachingDeleteEvents(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage, 1000) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // Wait until cacher is initialized. + cacher.ready.wait() + + fooPredicate := storage.SelectionPredicate{ + Label: labels.SelectorFromSet(map[string]string{"foo": "true"}), + Field: fields.Everything(), + } + barPredicate := storage.SelectionPredicate{ + Label: labels.SelectorFromSet(map[string]string{"bar": "true"}), + Field: fields.Everything(), + } + + createWatch := func(pred storage.SelectionPredicate) watch.Interface { + w, err := cacher.Watch(context.TODO(), "pods/ns", "999", pred) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + return w + } + + allEventsWatcher := createWatch(storage.Everything) + defer allEventsWatcher.Stop() + fooEventsWatcher := createWatch(fooPredicate) + defer fooEventsWatcher.Stop() + barEventsWatcher := createWatch(barPredicate) + defer barEventsWatcher.Stop() + + makePod := func(labels map[string]string, rv string) *examplev1.Pod { + return &examplev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "ns", + Labels: labels, + ResourceVersion: rv, + }, + } + } + pod1 := makePod(map[string]string{"foo": "true", "bar": "true"}, "1001") + pod2 := makePod(map[string]string{"foo": "true"}, "1002") + pod3 := makePod(map[string]string{}, "1003") + pod4 := makePod(map[string]string{}, "1004") + pod1DeletedAt2 := pod1.DeepCopyObject().(*examplev1.Pod) + pod1DeletedAt2.ResourceVersion = "1002" + pod2DeletedAt3 := pod2.DeepCopyObject().(*examplev1.Pod) + pod2DeletedAt3.ResourceVersion = "1003" + + allEvents := []watch.Event{ + {Type: watch.Added, Object: pod1.DeepCopy()}, + {Type: watch.Modified, Object: pod2.DeepCopy()}, + {Type: watch.Modified, Object: pod3.DeepCopy()}, + {Type: watch.Deleted, Object: pod4.DeepCopy()}, + } + fooEvents := []watch.Event{ + {Type: watch.Added, Object: pod1.DeepCopy()}, + {Type: watch.Modified, Object: pod2.DeepCopy()}, + {Type: watch.Deleted, Object: pod2DeletedAt3.DeepCopy()}, + } + barEvents := []watch.Event{ + {Type: watch.Added, Object: pod1.DeepCopy()}, + {Type: watch.Deleted, Object: pod1DeletedAt2.DeepCopy()}, + } + + cacher.watchCache.Add(pod1) + cacher.watchCache.Update(pod2) + cacher.watchCache.Update(pod3) + cacher.watchCache.Delete(pod4) + + verifyEvents(t, allEventsWatcher, allEvents) + verifyEvents(t, fooEventsWatcher, fooEvents) + verifyEvents(t, barEventsWatcher, barEvents) +} diff --git a/pkg/storage/cacher/watch_cache.go b/pkg/storage/cacher/watch_cache.go index 332aacd98..709e352f5 100644 --- a/pkg/storage/cacher/watch_cache.go +++ b/pkg/storage/cacher/watch_cache.go @@ -206,6 +206,37 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob return object, resourceVersion, nil } +func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) { + switch event.Type { + case watch.Added, watch.Modified: + if object, err := newCachingObject(event.Object); err == nil { + event.Object = object + } else { + klog.Errorf("couldn't create cachingObject from: %#v", event.Object) + } + // Don't wrap PrevObject for update event (for create events it is nil). + // We only encode those to deliver DELETE watch events, so if + // event.Object is not nil it can be used only for watchers for which + // selector was satisfied for its previous version and is no longer + // satisfied for the current version. + // This is rare enough that it doesn't justify making deep-copy of the + // object (done by newCachingObject) every time. + case watch.Deleted: + // Don't wrap Object for delete events - these are not to deliver any + // events. Only wrap PrevObject. + if object, err := newCachingObject(event.PrevObject); err == nil { + // Update resource version of the underlying object. + // event.PrevObject is used to deliver DELETE watch events and + // for them, we set resourceVersion to instead of + // the resourceVersion of the last modification of the object. + updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion) + event.PrevObject = object + } else { + klog.Errorf("couldn't create cachingObject from: %#v", event.Object) + } + } +} + // processEvent is safe as long as there is at most one call to it in flight // at any point in time. func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { @@ -219,7 +250,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd return err } - watchCacheEvent := &watchCacheEvent{ + wcEvent := &watchCacheEvent{ Type: event.Type, Object: elem.Object, ObjLabels: elem.Labels, @@ -242,12 +273,12 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd } if exists { previousElem := previous.(*storeElement) - watchCacheEvent.PrevObject = previousElem.Object - watchCacheEvent.PrevObjLabels = previousElem.Labels - watchCacheEvent.PrevObjFields = previousElem.Fields + wcEvent.PrevObject = previousElem.Object + wcEvent.PrevObjLabels = previousElem.Labels + wcEvent.PrevObjFields = previousElem.Fields } - w.updateCache(watchCacheEvent) + w.updateCache(wcEvent) w.resourceVersion = resourceVersion defer w.cond.Broadcast() @@ -260,7 +291,18 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd // This is safe as long as there is at most one call to processEvent in flight // at any point in time. if w.eventHandler != nil { - w.eventHandler(watchCacheEvent) + // Set up caching of object serializations only for dispatching this event. + // + // Storing serializations in memory would result in increased memory usage, + // but it would help for caching encodings for watches started from old + // versions. However, we still don't have a convincing data that the gain + // from it justifies increased memory usage, so for now we drop the cached + // serializations after dispatching this event. + + // Make a shallow copy to allow overwriting Object and PrevObject. + wce := *wcEvent + setCachingObjects(&wce, w.versioner) + w.eventHandler(&wce) } return nil } diff --git a/pkg/storage/cacher/watch_cache_test.go b/pkg/storage/cacher/watch_cache_test.go index e1f30de83..9883def73 100644 --- a/pkg/storage/cacher/watch_cache_test.go +++ b/pkg/storage/cacher/watch_cache_test.go @@ -18,6 +18,7 @@ package cacher import ( "fmt" + "reflect" "strconv" "strings" "testing" @@ -432,3 +433,53 @@ func TestReflectorForWatchCache(t *testing.T) { } } } + +func TestCachingObjects(t *testing.T) { + store := newTestWatchCache(5) + + index := 0 + store.eventHandler = func(event *watchCacheEvent) { + switch event.Type { + case watch.Added, watch.Modified: + if _, ok := event.Object.(runtime.CacheableObject); !ok { + t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object) + } + if _, ok := event.PrevObject.(runtime.CacheableObject); ok { + t.Fatalf("PrevObject in %s event should not support caching: %#v", event.Type, event.Object) + } + case watch.Deleted: + if _, ok := event.Object.(runtime.CacheableObject); ok { + t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object) + } + if _, ok := event.PrevObject.(runtime.CacheableObject); !ok { + t.Fatalf("PrevObject in %s event should support caching: %#v", event.Type, event.Object) + } + } + + // Verify that delivered event is the same as cached one modulo Object/PrevObject. + switch event.Type { + case watch.Added, watch.Modified: + event.Object = event.Object.(runtime.CacheableObject).GetObject() + case watch.Deleted: + event.PrevObject = event.PrevObject.(runtime.CacheableObject).GetObject() + // In events store in watchcache, we also don't update ResourceVersion. + // So we need to ensure that we don't fail on it. + resourceVersion, err := store.versioner.ObjectResourceVersion(store.cache[index].PrevObject) + if err != nil { + t.Fatalf("Failed to parse resource version: %v", err) + } + updateResourceVersionIfNeeded(event.PrevObject, store.versioner, resourceVersion) + } + if a, e := event, store.cache[index]; !reflect.DeepEqual(a, e) { + t.Errorf("watchCacheEvent messed up: %#v, expected: %#v", a, e) + } + index++ + } + + pod1 := makeTestPod("pod", 1) + pod2 := makeTestPod("pod", 2) + pod3 := makeTestPod("pod", 3) + store.Add(pod1) + store.Update(pod2) + store.Delete(pod3) +} diff --git a/pkg/storage/tests/cacher_test.go b/pkg/storage/tests/cacher_test.go index 8c2cb0611..80eeea09e 100644 --- a/pkg/storage/tests/cacher_test.go +++ b/pkg/storage/tests/cacher_test.go @@ -357,7 +357,11 @@ func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType t.Logf("(called from line %d)", line) t.Errorf("Expected: %s, got: %s", eventType, event.Type) } - if e, a := eventObject, event.Object; !apiequality.Semantic.DeepDerivative(e, a) { + object := event.Object + if co, ok := object.(runtime.CacheableObject); ok { + object = co.GetObject() + } + if e, a := eventObject, object; !apiequality.Semantic.DeepDerivative(e, a) { t.Logf("(called from line %d)", line) t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a) } @@ -606,7 +610,11 @@ func TestStartingResourceVersion(t *testing.T) { select { case e := <-watcher.ResultChan(): - pod := e.Object.(*example.Pod) + object := e.Object + if co, ok := object.(runtime.CacheableObject); ok { + object = co.GetObject() + } + pod := object.(*example.Pod) podRV, err := v.ParseResourceVersion(pod.ResourceVersion) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -725,7 +733,11 @@ func TestRandomWatchDeliver(t *testing.T) { if !ok { break } - if a, e := event.Object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a { + object := event.Object + if co, ok := object.(runtime.CacheableObject); ok { + object = co.GetObject() + } + if a, e := object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a { t.Errorf("Unexpected object watched: %s, expected %s", a, e) } watched++ @@ -911,7 +923,7 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) { pod := fmt.Sprintf("foo-%d", i) err := createPod(etcdStorage, makeTestPod(pod)) if err != nil { - t.Fatalf("failed to create pod %v", pod) + t.Fatalf("failed to create pod %v: %v", pod, err) } time.Sleep(time.Second / 100) }