Merge pull request #81914 from wojtek-t/cache_serializations_across_watchers
Cache serializations across watchers Kubernetes-commit: 7878160a9747c0c2d4f2cc16a7401407253d578b
This commit is contained in:
commit
99a0cf54d3
|
|
@ -520,11 +520,11 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery",
|
||||
"Rev": "0104e33c351d"
|
||||
"Rev": "082230a5ffdd"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/client-go",
|
||||
"Rev": "ad4f099992b0"
|
||||
"Rev": "0ebb3d5f4902"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/component-base",
|
||||
|
|
|
|||
8
go.mod
8
go.mod
|
|
@ -49,8 +49,8 @@ require (
|
|||
gopkg.in/yaml.v2 v2.2.2
|
||||
gotest.tools v2.2.0+incompatible // indirect
|
||||
k8s.io/api v0.0.0-20190927115716-5d581ce610b0
|
||||
k8s.io/apimachinery v0.0.0-20190927035529-0104e33c351d
|
||||
k8s.io/client-go v0.0.0-20190927200009-ad4f099992b0
|
||||
k8s.io/apimachinery v0.0.0-20191001195453-082230a5ffdd
|
||||
k8s.io/client-go v0.0.0-20191001195819-0ebb3d5f4902
|
||||
k8s.io/component-base v0.0.0-20190926082537-804254d56004
|
||||
k8s.io/klog v1.0.0
|
||||
k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf
|
||||
|
|
@ -68,7 +68,7 @@ replace (
|
|||
golang.org/x/text => golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db
|
||||
golang.org/x/time => golang.org/x/time v0.0.0-20161028155119-f51c12702a4d
|
||||
k8s.io/api => k8s.io/api v0.0.0-20190927115716-5d581ce610b0
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190927035529-0104e33c351d
|
||||
k8s.io/client-go => k8s.io/client-go v0.0.0-20190927200009-ad4f099992b0
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20191001195453-082230a5ffdd
|
||||
k8s.io/client-go => k8s.io/client-go v0.0.0-20191001195819-0ebb3d5f4902
|
||||
k8s.io/component-base => k8s.io/component-base v0.0.0-20190926082537-804254d56004
|
||||
)
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -291,8 +291,8 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
|
|||
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
k8s.io/api v0.0.0-20190927115716-5d581ce610b0/go.mod h1:l2ZHS8QbgqodGx7yrYsOSwIxOR76BpGiW1OywXo9PFI=
|
||||
k8s.io/apimachinery v0.0.0-20190927035529-0104e33c351d/go.mod h1:grJJH0hgilA2pYoUiJcPu2EDUal95NTq1vpxxvMLSu8=
|
||||
k8s.io/client-go v0.0.0-20190927200009-ad4f099992b0/go.mod h1:PS8HHN8WbM/GRT1+0HSEiOa1XMwuhc9ZbMoGJrnpG6M=
|
||||
k8s.io/apimachinery v0.0.0-20191001195453-082230a5ffdd/go.mod h1:grJJH0hgilA2pYoUiJcPu2EDUal95NTq1vpxxvMLSu8=
|
||||
k8s.io/client-go v0.0.0-20191001195819-0ebb3d5f4902/go.mod h1:Ffzajf+CyEz64mn6gHeT33NcKLbBO+z6xZKupz7Q91Y=
|
||||
k8s.io/component-base v0.0.0-20190926082537-804254d56004/go.mod h1:+sedDd0Yj/9lFSZjan8FdX4Jednr2we+Q0ZDeicbKSc=
|
||||
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
|
||||
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||
|
|
|
|||
|
|
@ -18,10 +18,12 @@ package discovery
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
const APIGroupPrefix = "/apis"
|
||||
|
|
@ -36,9 +38,39 @@ func keepUnversioned(group string) bool {
|
|||
type stripVersionEncoder struct {
|
||||
encoder runtime.Encoder
|
||||
serializer runtime.Serializer
|
||||
identifier runtime.Identifier
|
||||
}
|
||||
|
||||
func newStripVersionEncoder(e runtime.Encoder, s runtime.Serializer) runtime.Encoder {
|
||||
return stripVersionEncoder{
|
||||
encoder: e,
|
||||
serializer: s,
|
||||
identifier: identifier(e),
|
||||
}
|
||||
}
|
||||
|
||||
func identifier(e runtime.Encoder) runtime.Identifier {
|
||||
result := map[string]string{
|
||||
"name": "stripVersion",
|
||||
}
|
||||
if e != nil {
|
||||
result["encoder"] = string(e.Identifier())
|
||||
}
|
||||
identifier, err := json.Marshal(result)
|
||||
if err != nil {
|
||||
klog.Fatalf("Failed marshaling identifier for stripVersionEncoder: %v", err)
|
||||
}
|
||||
return runtime.Identifier(identifier)
|
||||
}
|
||||
|
||||
func (c stripVersionEncoder) Encode(obj runtime.Object, w io.Writer) error {
|
||||
if co, ok := obj.(runtime.CacheableObject); ok {
|
||||
return co.CacheEncode(c.Identifier(), c.doEncode, w)
|
||||
}
|
||||
return c.doEncode(obj, w)
|
||||
}
|
||||
|
||||
func (c stripVersionEncoder) doEncode(obj runtime.Object, w io.Writer) error {
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
err := c.encoder.Encode(obj, buf)
|
||||
if err != nil {
|
||||
|
|
@ -54,6 +86,11 @@ func (c stripVersionEncoder) Encode(obj runtime.Object, w io.Writer) error {
|
|||
return c.serializer.Encode(roundTrippedObj, w)
|
||||
}
|
||||
|
||||
// Identifier implements runtime.Encoder interface.
|
||||
func (c stripVersionEncoder) Identifier() runtime.Identifier {
|
||||
return c.identifier
|
||||
}
|
||||
|
||||
// stripVersionNegotiatedSerializer will return stripVersionEncoder when
|
||||
// EncoderForVersion is called. See comments for stripVersionEncoder.
|
||||
type stripVersionNegotiatedSerializer struct {
|
||||
|
|
@ -69,5 +106,5 @@ func (n stripVersionNegotiatedSerializer) EncoderForVersion(encoder runtime.Enco
|
|||
panic(fmt.Sprintf("Unable to extract serializer from %#v", encoder))
|
||||
}
|
||||
versioned := n.NegotiatedSerializer.EncoderForVersion(encoder, gv)
|
||||
return stripVersionEncoder{versioned, serializer}
|
||||
return newStripVersionEncoder(versioned, serializer)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,6 +38,24 @@ import (
|
|||
// the client's desired form, as well as ensuring any API level fields like self-link
|
||||
// are properly set.
|
||||
func transformObject(ctx context.Context, obj runtime.Object, opts interface{}, mediaType negotiation.MediaTypeOptions, scope *RequestScope, req *http.Request) (runtime.Object, error) {
|
||||
if co, ok := obj.(runtime.CacheableObject); ok {
|
||||
if mediaType.Convert != nil {
|
||||
// Non-nil mediaType.Convert means that some conversion of the object
|
||||
// has to happen. Currently conversion may potentially modify the
|
||||
// object or assume something about it (e.g. asTable operates on
|
||||
// reflection, which won't work for any wrapper).
|
||||
// To ensure it will work correctly, let's operate on base objects
|
||||
// and not cache it for now.
|
||||
//
|
||||
// TODO: Long-term, transformObject should be changed so that it
|
||||
// implements runtime.Encoder interface.
|
||||
return doTransformObject(ctx, co.GetObject(), opts, mediaType, scope, req)
|
||||
}
|
||||
}
|
||||
return doTransformObject(ctx, obj, opts, mediaType, scope, req)
|
||||
}
|
||||
|
||||
func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}, mediaType negotiation.MediaTypeOptions, scope *RequestScope, req *http.Request) (runtime.Object, error) {
|
||||
if _, ok := obj.(*metav1.Status); ok {
|
||||
return obj, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,173 @@
|
|||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
)
|
||||
|
||||
var _ runtime.CacheableObject = &mockCacheableObject{}
|
||||
|
||||
type mockCacheableObject struct {
|
||||
gvk schema.GroupVersionKind
|
||||
obj runtime.Object
|
||||
}
|
||||
|
||||
// DeepCopyObject implements runtime.Object interface.
|
||||
func (m *mockCacheableObject) DeepCopyObject() runtime.Object {
|
||||
panic("DeepCopy unimplemented for mockCacheableObject")
|
||||
}
|
||||
|
||||
// GetObjectKind implements runtime.Object interface.
|
||||
func (m *mockCacheableObject) GetObjectKind() schema.ObjectKind {
|
||||
return m
|
||||
}
|
||||
|
||||
// GroupVersionKind implements schema.ObjectKind interface.
|
||||
func (m *mockCacheableObject) GroupVersionKind() schema.GroupVersionKind {
|
||||
return m.gvk
|
||||
}
|
||||
|
||||
// SetGroupVersionKind implements schema.ObjectKind interface.
|
||||
func (m *mockCacheableObject) SetGroupVersionKind(gvk schema.GroupVersionKind) {
|
||||
m.gvk = gvk
|
||||
}
|
||||
|
||||
// CacheEncode implements runtime.CacheableObject interface.
|
||||
func (m *mockCacheableObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error {
|
||||
return fmt.Errorf("unimplemented")
|
||||
}
|
||||
|
||||
// GetObject implements runtime.CacheableObject interface.
|
||||
func (m *mockCacheableObject) GetObject() runtime.Object {
|
||||
return m.obj
|
||||
}
|
||||
|
||||
type mockNamer struct{}
|
||||
|
||||
func (*mockNamer) Namespace(_ *http.Request) (string, error) { return "", nil }
|
||||
func (*mockNamer) Name(_ *http.Request) (string, string, error) { return "", "", nil }
|
||||
func (*mockNamer) ObjectName(_ runtime.Object) (string, string, error) { return "", "", nil }
|
||||
func (*mockNamer) SetSelfLink(_ runtime.Object, _ string) error { return nil }
|
||||
func (*mockNamer) GenerateLink(_ *request.RequestInfo, _ runtime.Object) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
func (*mockNamer) GenerateListLink(_ *http.Request) (string, error) { return "", nil }
|
||||
|
||||
func TestCacheableObject(t *testing.T) {
|
||||
pomGVK := metav1.SchemeGroupVersion.WithKind("PartialObjectMetadata")
|
||||
tableGVK := metav1.SchemeGroupVersion.WithKind("Table")
|
||||
|
||||
status := &metav1.Status{Status: "status"}
|
||||
pod := &examplev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "name",
|
||||
Namespace: "namespace",
|
||||
},
|
||||
}
|
||||
podMeta := &metav1.PartialObjectMetadata{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "name",
|
||||
Namespace: "namespace",
|
||||
},
|
||||
}
|
||||
podMeta.GetObjectKind().SetGroupVersionKind(pomGVK)
|
||||
podTable := &metav1.Table{
|
||||
Rows: []metav1.TableRow{
|
||||
{
|
||||
Cells: []interface{}{pod.Name, pod.CreationTimestamp.Time.UTC().Format(time.RFC3339)},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tableConvertor := rest.NewDefaultTableConvertor(examplev1.Resource("Pod"))
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
object runtime.Object
|
||||
opts *metav1beta1.TableOptions
|
||||
mediaType negotiation.MediaTypeOptions
|
||||
|
||||
expectedUnwrap bool
|
||||
expectedObj runtime.Object
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
desc: "metav1.Status",
|
||||
object: status,
|
||||
expectedObj: status,
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
desc: "cacheableObject nil convert",
|
||||
object: &mockCacheableObject{obj: pod},
|
||||
mediaType: negotiation.MediaTypeOptions{},
|
||||
expectedObj: &mockCacheableObject{obj: pod},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
desc: "cacheableObject as PartialObjectMeta",
|
||||
object: &mockCacheableObject{obj: pod},
|
||||
mediaType: negotiation.MediaTypeOptions{Convert: &pomGVK},
|
||||
expectedObj: podMeta,
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
desc: "cacheableObject as Table",
|
||||
object: &mockCacheableObject{obj: pod},
|
||||
opts: &metav1beta1.TableOptions{NoHeaders: true, IncludeObject: metav1.IncludeNone},
|
||||
mediaType: negotiation.MediaTypeOptions{Convert: &tableGVK},
|
||||
expectedObj: podTable,
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
result, err := transformObject(
|
||||
request.WithRequestInfo(context.TODO(), &request.RequestInfo{}),
|
||||
test.object, test.opts, test.mediaType,
|
||||
&RequestScope{
|
||||
Namer: &mockNamer{},
|
||||
TableConvertor: tableConvertor,
|
||||
},
|
||||
nil)
|
||||
|
||||
if err != test.expectedErr {
|
||||
t.Errorf("unexpected error: %v, expected: %v", err, test.expectedErr)
|
||||
}
|
||||
if a, e := result, test.expectedObj; !reflect.DeepEqual(a, e) {
|
||||
t.Errorf("unexpected result: %v, expected: %v", a, e)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -287,6 +287,10 @@ func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (e *fakeEncoder) Identifier() runtime.Identifier {
|
||||
return runtime.Identifier("fake")
|
||||
}
|
||||
|
||||
func gzipContent(data []byte, level int) []byte {
|
||||
buf := &bytes.Buffer{}
|
||||
gw, err := gzip.NewWriterLevel(buf, level)
|
||||
|
|
|
|||
|
|
@ -754,17 +754,25 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) {
|
|||
return c.storage.Count(pathPrefix)
|
||||
}
|
||||
|
||||
func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
|
||||
// baseObjectThreadUnsafe omits locking for cachingObject.
|
||||
func baseObjectThreadUnsafe(object runtime.Object) runtime.Object {
|
||||
if co, ok := object.(*cachingObject); ok {
|
||||
return co.object
|
||||
}
|
||||
return object
|
||||
}
|
||||
|
||||
func (c *Cacher) triggerValuesThreadUnsafe(event *watchCacheEvent) ([]string, bool) {
|
||||
if c.indexedTrigger == nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
result := make([]string, 0, 2)
|
||||
result = append(result, c.indexedTrigger.indexerFunc(event.Object))
|
||||
result = append(result, c.indexedTrigger.indexerFunc(baseObjectThreadUnsafe(event.Object)))
|
||||
if event.PrevObject == nil {
|
||||
return result, true
|
||||
}
|
||||
prevTriggerValue := c.indexedTrigger.indexerFunc(event.PrevObject)
|
||||
prevTriggerValue := c.indexedTrigger.indexerFunc(baseObjectThreadUnsafe(event.PrevObject))
|
||||
if result[0] != prevTriggerValue {
|
||||
result = append(result, prevTriggerValue)
|
||||
}
|
||||
|
|
@ -892,7 +900,10 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
|
|||
// startDispatching chooses watchers potentially interested in a given event
|
||||
// a marks dispatching as true.
|
||||
func (c *Cacher) startDispatching(event *watchCacheEvent) {
|
||||
triggerValues, supported := c.triggerValues(event)
|
||||
// It is safe to call triggerValuesThreadUnsafe here, because at this
|
||||
// point only this thread can access this event (we create a separate
|
||||
// watchCacheEvent for every dispatch).
|
||||
triggerValues, supported := c.triggerValuesThreadUnsafe(event)
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
|
@ -1165,7 +1176,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
|
|||
// This means that we couldn't send event to that watcher.
|
||||
// Since we don't want to block on it infinitely,
|
||||
// we simply terminate it.
|
||||
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", reflect.TypeOf(event.Object).String())
|
||||
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())
|
||||
c.forget()
|
||||
}
|
||||
|
||||
|
|
@ -1193,6 +1204,25 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
|
|||
return c.deadline.Add(-2 * time.Second), true
|
||||
}
|
||||
|
||||
func getEventObject(object runtime.Object) runtime.Object {
|
||||
if _, ok := object.(runtime.CacheableObject); ok {
|
||||
// It is safe to return without deep-copy, because the underlying
|
||||
// object was already deep-copied during construction.
|
||||
return object
|
||||
}
|
||||
return object.DeepCopyObject()
|
||||
}
|
||||
|
||||
func updateResourceVersionIfNeeded(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
|
||||
if _, ok := object.(*cachingObject); ok {
|
||||
// We assume that for cachingObject resourceVersion was already propagated before.
|
||||
return
|
||||
}
|
||||
if err := versioner.UpdateObject(object, resourceVersion); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
|
||||
if event.Type == watch.Bookmark {
|
||||
return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
|
||||
|
|
@ -1210,15 +1240,13 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event
|
|||
|
||||
switch {
|
||||
case curObjPasses && !oldObjPasses:
|
||||
return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
|
||||
return &watch.Event{Type: watch.Added, Object: getEventObject(event.Object)}
|
||||
case curObjPasses && oldObjPasses:
|
||||
return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
|
||||
return &watch.Event{Type: watch.Modified, Object: getEventObject(event.Object)}
|
||||
case !curObjPasses && oldObjPasses:
|
||||
// return a delete event with the previous object content, but with the event's resource version
|
||||
oldObj := event.PrevObject.DeepCopyObject()
|
||||
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
|
||||
}
|
||||
oldObj := getEventObject(event.PrevObject)
|
||||
updateResourceVersionIfNeeded(oldObj, c.versioner, event.ResourceVersion)
|
||||
return &watch.Event{Type: watch.Deleted, Object: oldObj}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,12 +20,14 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
goruntime "runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
|
|
@ -265,7 +267,7 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner, er
|
|||
Versioner: testVersioner{},
|
||||
ResourcePrefix: prefix,
|
||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
||||
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) { return nil, nil, nil },
|
||||
GetAttrsFunc: storage.DefaultNamespaceScopedAttr,
|
||||
NewFunc: func() runtime.Object { return &example.Pod{} },
|
||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
||||
|
|
@ -452,7 +454,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
|
|||
shouldContinue = false
|
||||
break
|
||||
}
|
||||
rv, err := testVersioner{}.ParseResourceVersion(event.Object.(*examplev1.Pod).ResourceVersion)
|
||||
rv, err := testVersioner{}.ParseResourceVersion(event.Object.(metaRuntimeInterface).GetResourceVersion())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected parsing error: %v", err)
|
||||
} else {
|
||||
|
|
@ -703,7 +705,7 @@ func testCacherSendBookmarkEvents(t *testing.T, watchCacheEnabled, allowWatchBoo
|
|||
}
|
||||
rv, err := cacher.versioner.ObjectResourceVersion(event.Object)
|
||||
if err != nil {
|
||||
t.Errorf("failed to parse resource version from %#v", event.Object)
|
||||
t.Errorf("failed to parse resource version from %#v: %v", event.Object, err)
|
||||
}
|
||||
if event.Type == watch.Bookmark {
|
||||
if !expectedBookmarks {
|
||||
|
|
@ -906,3 +908,107 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
|||
t.Errorf("watcher is blocked by slower one (count: %d)", eventsCount)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event) {
|
||||
_, _, line, _ := goruntime.Caller(1)
|
||||
for _, expectedEvent := range events {
|
||||
select {
|
||||
case event := <-w.ResultChan():
|
||||
if e, a := expectedEvent.Type, event.Type; e != a {
|
||||
t.Logf("(called from line %d)", line)
|
||||
t.Errorf("Expected: %s, got: %s", e, a)
|
||||
}
|
||||
object := event.Object
|
||||
if co, ok := object.(runtime.CacheableObject); ok {
|
||||
object = co.GetObject()
|
||||
}
|
||||
if e, a := expectedEvent.Object, object; !apiequality.Semantic.DeepEqual(e, a) {
|
||||
t.Logf("(called from line %d)", line)
|
||||
t.Errorf("Expected: %#v, got: %#v", e, a)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Logf("(called from line %d)", line)
|
||||
t.Errorf("Timed out waiting for an event")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCachingDeleteEvents(t *testing.T) {
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage, 1000)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
|
||||
// Wait until cacher is initialized.
|
||||
cacher.ready.wait()
|
||||
|
||||
fooPredicate := storage.SelectionPredicate{
|
||||
Label: labels.SelectorFromSet(map[string]string{"foo": "true"}),
|
||||
Field: fields.Everything(),
|
||||
}
|
||||
barPredicate := storage.SelectionPredicate{
|
||||
Label: labels.SelectorFromSet(map[string]string{"bar": "true"}),
|
||||
Field: fields.Everything(),
|
||||
}
|
||||
|
||||
createWatch := func(pred storage.SelectionPredicate) watch.Interface {
|
||||
w, err := cacher.Watch(context.TODO(), "pods/ns", "999", pred)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create watch: %v", err)
|
||||
}
|
||||
return w
|
||||
}
|
||||
|
||||
allEventsWatcher := createWatch(storage.Everything)
|
||||
defer allEventsWatcher.Stop()
|
||||
fooEventsWatcher := createWatch(fooPredicate)
|
||||
defer fooEventsWatcher.Stop()
|
||||
barEventsWatcher := createWatch(barPredicate)
|
||||
defer barEventsWatcher.Stop()
|
||||
|
||||
makePod := func(labels map[string]string, rv string) *examplev1.Pod {
|
||||
return &examplev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod",
|
||||
Namespace: "ns",
|
||||
Labels: labels,
|
||||
ResourceVersion: rv,
|
||||
},
|
||||
}
|
||||
}
|
||||
pod1 := makePod(map[string]string{"foo": "true", "bar": "true"}, "1001")
|
||||
pod2 := makePod(map[string]string{"foo": "true"}, "1002")
|
||||
pod3 := makePod(map[string]string{}, "1003")
|
||||
pod4 := makePod(map[string]string{}, "1004")
|
||||
pod1DeletedAt2 := pod1.DeepCopyObject().(*examplev1.Pod)
|
||||
pod1DeletedAt2.ResourceVersion = "1002"
|
||||
pod2DeletedAt3 := pod2.DeepCopyObject().(*examplev1.Pod)
|
||||
pod2DeletedAt3.ResourceVersion = "1003"
|
||||
|
||||
allEvents := []watch.Event{
|
||||
{Type: watch.Added, Object: pod1.DeepCopy()},
|
||||
{Type: watch.Modified, Object: pod2.DeepCopy()},
|
||||
{Type: watch.Modified, Object: pod3.DeepCopy()},
|
||||
{Type: watch.Deleted, Object: pod4.DeepCopy()},
|
||||
}
|
||||
fooEvents := []watch.Event{
|
||||
{Type: watch.Added, Object: pod1.DeepCopy()},
|
||||
{Type: watch.Modified, Object: pod2.DeepCopy()},
|
||||
{Type: watch.Deleted, Object: pod2DeletedAt3.DeepCopy()},
|
||||
}
|
||||
barEvents := []watch.Event{
|
||||
{Type: watch.Added, Object: pod1.DeepCopy()},
|
||||
{Type: watch.Deleted, Object: pod1DeletedAt2.DeepCopy()},
|
||||
}
|
||||
|
||||
cacher.watchCache.Add(pod1)
|
||||
cacher.watchCache.Update(pod2)
|
||||
cacher.watchCache.Update(pod3)
|
||||
cacher.watchCache.Delete(pod4)
|
||||
|
||||
verifyEvents(t, allEventsWatcher, allEvents)
|
||||
verifyEvents(t, fooEventsWatcher, fooEvents)
|
||||
verifyEvents(t, barEventsWatcher, barEvents)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,397 @@
|
|||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cacher
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
var _ runtime.CacheableObject = &cachingObject{}
|
||||
|
||||
// metaRuntimeInterface implements runtime.Object and
|
||||
// metav1.Object interfaces.
|
||||
type metaRuntimeInterface interface {
|
||||
runtime.Object
|
||||
metav1.Object
|
||||
}
|
||||
|
||||
// serializationResult captures a result of serialization.
|
||||
type serializationResult struct {
|
||||
// once should be used to ensure serialization is computed once.
|
||||
once sync.Once
|
||||
|
||||
// raw is serialized object.
|
||||
raw []byte
|
||||
// err is error from serialization.
|
||||
err error
|
||||
}
|
||||
|
||||
// serializationsCache is a type for caching serialization results.
|
||||
type serializationsCache map[runtime.Identifier]*serializationResult
|
||||
|
||||
// cachingObject is an object that is able to cache its serializations
|
||||
// so that each of those is computed exactly once.
|
||||
//
|
||||
// cachingObject implements the metav1.Object interface (accessors for
|
||||
// all metadata fields). However, setters for all fields except from
|
||||
// SelfLink (which is set lately in the path) are ignored.
|
||||
type cachingObject struct {
|
||||
lock sync.RWMutex
|
||||
|
||||
// Object for which serializations are cached.
|
||||
object metaRuntimeInterface
|
||||
|
||||
// serializations is a cache containing object`s serializations.
|
||||
// The value stored in atomic.Value is of type serializationsCache.
|
||||
// The atomic.Value type is used to allow fast-path.
|
||||
serializations atomic.Value
|
||||
}
|
||||
|
||||
// newCachingObject performs a deep copy of the given object and wraps it
|
||||
// into a cachingObject.
|
||||
// An error is returned if it's not possible to cast the object to
|
||||
// metav1.Object type.
|
||||
func newCachingObject(object runtime.Object) (*cachingObject, error) {
|
||||
if obj, ok := object.(metaRuntimeInterface); ok {
|
||||
result := &cachingObject{object: obj.DeepCopyObject().(metaRuntimeInterface)}
|
||||
result.serializations.Store(make(serializationsCache))
|
||||
return result, nil
|
||||
}
|
||||
return nil, fmt.Errorf("can't cast object to metav1.Object: %#v", object)
|
||||
}
|
||||
|
||||
func (o *cachingObject) getSerializationResult(id runtime.Identifier) *serializationResult {
|
||||
// Fast-path for getting from cache.
|
||||
serializations := o.serializations.Load().(serializationsCache)
|
||||
if result, exists := serializations[id]; exists {
|
||||
return result
|
||||
}
|
||||
|
||||
// Slow-path (that may require insert).
|
||||
o.lock.Lock()
|
||||
defer o.lock.Unlock()
|
||||
|
||||
serializations = o.serializations.Load().(serializationsCache)
|
||||
// Check if in the meantime it wasn't inserted.
|
||||
if result, exists := serializations[id]; exists {
|
||||
return result
|
||||
}
|
||||
|
||||
// Insert an entry for <id>. This requires copy of existing map.
|
||||
newSerializations := make(serializationsCache)
|
||||
for k, v := range serializations {
|
||||
newSerializations[k] = v
|
||||
}
|
||||
result := &serializationResult{}
|
||||
newSerializations[id] = result
|
||||
o.serializations.Store(newSerializations)
|
||||
return result
|
||||
}
|
||||
|
||||
// CacheEncode implements runtime.CacheableObject interface.
|
||||
// It serializes the object and writes the result to given io.Writer trying
|
||||
// to first use the already cached result and falls back to a given encode
|
||||
// function in case of cache miss.
|
||||
// It assumes that for a given identifier, the encode function always encodes
|
||||
// each input object into the same output format.
|
||||
func (o *cachingObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error {
|
||||
result := o.getSerializationResult(id)
|
||||
result.once.Do(func() {
|
||||
buffer := bytes.NewBuffer(nil)
|
||||
result.err = encode(o.GetObject(), buffer)
|
||||
result.raw = buffer.Bytes()
|
||||
})
|
||||
// Once invoked, fields of serialization will not change.
|
||||
if result.err != nil {
|
||||
return result.err
|
||||
}
|
||||
_, err := w.Write(result.raw)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetObject implements runtime.CacheableObject interface.
|
||||
// It returns deep-copy of the wrapped object to return ownership of it
|
||||
// to the called according to the contract of the interface.
|
||||
func (o *cachingObject) GetObject() runtime.Object {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.DeepCopyObject().(metaRuntimeInterface)
|
||||
}
|
||||
|
||||
// GetObjectKind implements runtime.Object interface.
|
||||
func (o *cachingObject) GetObjectKind() schema.ObjectKind {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetObjectKind()
|
||||
}
|
||||
|
||||
// DeepCopyObject implements runtime.Object interface.
|
||||
func (o *cachingObject) DeepCopyObject() runtime.Object {
|
||||
// DeepCopyObject on cachingObject is not expected to be called anywhere.
|
||||
// However, to be on the safe-side, we implement it, though given the
|
||||
// cache is only an optimization we ignore copying it.
|
||||
result := &cachingObject{}
|
||||
result.serializations.Store(make(serializationsCache))
|
||||
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
result.object = o.object.DeepCopyObject().(metaRuntimeInterface)
|
||||
return result
|
||||
}
|
||||
|
||||
var (
|
||||
invalidationCacheTimestampLock sync.Mutex
|
||||
invalidationCacheTimestamp time.Time
|
||||
)
|
||||
|
||||
// shouldLogCacheInvalidation allows for logging cache-invalidation
|
||||
// at most once per second (to avoid spamming logs in case of issues).
|
||||
func shouldLogCacheInvalidation(now time.Time) bool {
|
||||
invalidationCacheTimestampLock.Lock()
|
||||
defer invalidationCacheTimestampLock.Unlock()
|
||||
if invalidationCacheTimestamp.Add(time.Second).Before(now) {
|
||||
invalidationCacheTimestamp = now
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (o *cachingObject) invalidateCacheLocked() {
|
||||
if cache, ok := o.serializations.Load().(serializationsCache); ok && len(cache) == 0 {
|
||||
return
|
||||
}
|
||||
// We don't expect cache invalidation to happen - so we want
|
||||
// to log the stacktrace to allow debugging if that will happen.
|
||||
// OTOH, we don't want to spam logs with it.
|
||||
// So we try to log it at most once per second.
|
||||
if shouldLogCacheInvalidation(time.Now()) {
|
||||
klog.Warningf("Unexpected cache invalidation for %#v\n%s", o.object, string(debug.Stack()))
|
||||
}
|
||||
o.serializations.Store(make(serializationsCache))
|
||||
}
|
||||
|
||||
// The following functions implement metav1.Object interface:
|
||||
// - getters simply delegate for the underlying object
|
||||
// - setters check if operations isn't noop and if so,
|
||||
// invalidate the cache and delegate for the underlying object
|
||||
|
||||
func (o *cachingObject) conditionalSet(isNoop func() bool, set func()) {
|
||||
if fastPath := func() bool {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return isNoop()
|
||||
}(); fastPath {
|
||||
return
|
||||
}
|
||||
o.lock.Lock()
|
||||
defer o.lock.Unlock()
|
||||
if isNoop() {
|
||||
return
|
||||
}
|
||||
o.invalidateCacheLocked()
|
||||
set()
|
||||
}
|
||||
|
||||
func (o *cachingObject) GetNamespace() string {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetNamespace()
|
||||
}
|
||||
func (o *cachingObject) SetNamespace(namespace string) {
|
||||
o.conditionalSet(
|
||||
func() bool { return o.object.GetNamespace() == namespace },
|
||||
func() { o.object.SetNamespace(namespace) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetName() string {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetName()
|
||||
}
|
||||
func (o *cachingObject) SetName(name string) {
|
||||
o.conditionalSet(
|
||||
func() bool { return o.object.GetName() == name },
|
||||
func() { o.object.SetName(name) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetGenerateName() string {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetGenerateName()
|
||||
}
|
||||
func (o *cachingObject) SetGenerateName(name string) {
|
||||
o.conditionalSet(
|
||||
func() bool { return o.object.GetGenerateName() == name },
|
||||
func() { o.object.SetGenerateName(name) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetUID() types.UID {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetUID()
|
||||
}
|
||||
func (o *cachingObject) SetUID(uid types.UID) {
|
||||
o.conditionalSet(
|
||||
func() bool { return o.object.GetUID() == uid },
|
||||
func() { o.object.SetUID(uid) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetResourceVersion() string {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetResourceVersion()
|
||||
}
|
||||
func (o *cachingObject) SetResourceVersion(version string) {
|
||||
o.conditionalSet(
|
||||
func() bool { return o.object.GetResourceVersion() == version },
|
||||
func() { o.object.SetResourceVersion(version) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetGeneration() int64 {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetGeneration()
|
||||
}
|
||||
func (o *cachingObject) SetGeneration(generation int64) {
|
||||
o.conditionalSet(
|
||||
func() bool { return o.object.GetGeneration() == generation },
|
||||
func() { o.object.SetGeneration(generation) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetSelfLink() string {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetSelfLink()
|
||||
}
|
||||
func (o *cachingObject) SetSelfLink(selfLink string) {
|
||||
o.conditionalSet(
|
||||
func() bool { return o.object.GetSelfLink() == selfLink },
|
||||
func() { o.object.SetSelfLink(selfLink) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetCreationTimestamp() metav1.Time {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetCreationTimestamp()
|
||||
}
|
||||
func (o *cachingObject) SetCreationTimestamp(timestamp metav1.Time) {
|
||||
o.conditionalSet(
|
||||
func() bool { return o.object.GetCreationTimestamp() == timestamp },
|
||||
func() { o.object.SetCreationTimestamp(timestamp) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetDeletionTimestamp() *metav1.Time {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetDeletionTimestamp()
|
||||
}
|
||||
func (o *cachingObject) SetDeletionTimestamp(timestamp *metav1.Time) {
|
||||
o.conditionalSet(
|
||||
func() bool { return o.object.GetDeletionTimestamp() == timestamp },
|
||||
func() { o.object.SetDeletionTimestamp(timestamp) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetDeletionGracePeriodSeconds() *int64 {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetDeletionGracePeriodSeconds()
|
||||
}
|
||||
func (o *cachingObject) SetDeletionGracePeriodSeconds(gracePeriodSeconds *int64) {
|
||||
o.conditionalSet(
|
||||
func() bool { return o.object.GetDeletionGracePeriodSeconds() == gracePeriodSeconds },
|
||||
func() { o.object.SetDeletionGracePeriodSeconds(gracePeriodSeconds) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetLabels() map[string]string {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetLabels()
|
||||
}
|
||||
func (o *cachingObject) SetLabels(labels map[string]string) {
|
||||
o.conditionalSet(
|
||||
func() bool { return reflect.DeepEqual(o.object.GetLabels(), labels) },
|
||||
func() { o.object.SetLabels(labels) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetAnnotations() map[string]string {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetAnnotations()
|
||||
}
|
||||
func (o *cachingObject) SetAnnotations(annotations map[string]string) {
|
||||
o.conditionalSet(
|
||||
func() bool { return reflect.DeepEqual(o.object.GetAnnotations(), annotations) },
|
||||
func() { o.object.SetAnnotations(annotations) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetFinalizers() []string {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetFinalizers()
|
||||
}
|
||||
func (o *cachingObject) SetFinalizers(finalizers []string) {
|
||||
o.conditionalSet(
|
||||
func() bool { return reflect.DeepEqual(o.object.GetFinalizers(), finalizers) },
|
||||
func() { o.object.SetFinalizers(finalizers) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetOwnerReferences() []metav1.OwnerReference {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetOwnerReferences()
|
||||
}
|
||||
func (o *cachingObject) SetOwnerReferences(references []metav1.OwnerReference) {
|
||||
o.conditionalSet(
|
||||
func() bool { return reflect.DeepEqual(o.object.GetOwnerReferences(), references) },
|
||||
func() { o.object.SetOwnerReferences(references) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetClusterName() string {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetClusterName()
|
||||
}
|
||||
func (o *cachingObject) SetClusterName(clusterName string) {
|
||||
o.conditionalSet(
|
||||
func() bool { return o.object.GetClusterName() == clusterName },
|
||||
func() { o.object.SetClusterName(clusterName) },
|
||||
)
|
||||
}
|
||||
func (o *cachingObject) GetManagedFields() []metav1.ManagedFieldsEntry {
|
||||
o.lock.RLock()
|
||||
defer o.lock.RUnlock()
|
||||
return o.object.GetManagedFields()
|
||||
}
|
||||
func (o *cachingObject) SetManagedFields(managedFields []metav1.ManagedFieldsEntry) {
|
||||
o.conditionalSet(
|
||||
func() bool { return reflect.DeepEqual(o.object.GetManagedFields(), managedFields) },
|
||||
func() { o.object.SetManagedFields(managedFields) },
|
||||
)
|
||||
}
|
||||
|
|
@ -0,0 +1,160 @@
|
|||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cacher
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
type mockEncoder struct {
|
||||
identifier runtime.Identifier
|
||||
expectedResult string
|
||||
expectedError error
|
||||
|
||||
callsNumber int32
|
||||
}
|
||||
|
||||
func newMockEncoder(id, result string, err error) *mockEncoder {
|
||||
return &mockEncoder{
|
||||
identifier: runtime.Identifier(id),
|
||||
expectedResult: result,
|
||||
expectedError: err,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *mockEncoder) encode(_ runtime.Object, w io.Writer) error {
|
||||
atomic.AddInt32(&e.callsNumber, 1)
|
||||
if e.expectedError != nil {
|
||||
return e.expectedError
|
||||
}
|
||||
_, err := w.Write([]byte(e.expectedResult))
|
||||
return err
|
||||
}
|
||||
|
||||
func TestCachingObject(t *testing.T) {
|
||||
object, err := newCachingObject(&v1.Pod{})
|
||||
if err != nil {
|
||||
t.Fatalf("couldn't create cachingObject: %v", err)
|
||||
}
|
||||
|
||||
encoders := []*mockEncoder{
|
||||
newMockEncoder("1", "result1", nil),
|
||||
newMockEncoder("2", "", fmt.Errorf("mock error")),
|
||||
newMockEncoder("3", "result3", nil),
|
||||
}
|
||||
|
||||
for _, encoder := range encoders {
|
||||
buffer := bytes.NewBuffer(nil)
|
||||
err := object.CacheEncode(encoder.identifier, encoder.encode, buffer)
|
||||
if a, e := err, encoder.expectedError; e != a {
|
||||
t.Errorf("%s: unexpected error: %v, expected: %v", encoder.identifier, a, e)
|
||||
}
|
||||
if a, e := buffer.String(), encoder.expectedResult; e != a {
|
||||
t.Errorf("%s: unexpected result: %s, expected: %s", encoder.identifier, a, e)
|
||||
}
|
||||
}
|
||||
for _, encoder := range encoders {
|
||||
if encoder.callsNumber != 1 {
|
||||
t.Errorf("%s: unexpected number of encode() calls: %d", encoder.identifier, encoder.callsNumber)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelfLink(t *testing.T) {
|
||||
object, err := newCachingObject(&v1.Pod{})
|
||||
if err != nil {
|
||||
t.Fatalf("couldn't create cachingObject: %v", err)
|
||||
}
|
||||
selfLink := "selfLink"
|
||||
object.SetSelfLink(selfLink)
|
||||
|
||||
encodeSelfLink := func(obj runtime.Object, w io.Writer) error {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get accessor for %#v: %v", obj, err)
|
||||
}
|
||||
_, err = w.Write([]byte(accessor.GetSelfLink()))
|
||||
return err
|
||||
}
|
||||
buffer := bytes.NewBuffer(nil)
|
||||
if err := object.CacheEncode("", encodeSelfLink, buffer); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if a, e := buffer.String(), selfLink; a != e {
|
||||
t.Errorf("unexpected serialization: %s, expected: %s", a, e)
|
||||
}
|
||||
|
||||
// GetObject should also set selfLink.
|
||||
buffer.Reset()
|
||||
if err := encodeSelfLink(object.GetObject(), buffer); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if a, e := buffer.String(), selfLink; a != e {
|
||||
t.Errorf("unexpected serialization: %s, expected: %s", a, e)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCachingObjectRaces(t *testing.T) {
|
||||
object, err := newCachingObject(&v1.Pod{})
|
||||
if err != nil {
|
||||
t.Fatalf("couldn't create cachingObject: %v", err)
|
||||
}
|
||||
|
||||
encoders := []*mockEncoder{}
|
||||
for i := 0; i < 10; i++ {
|
||||
encoder := newMockEncoder(fmt.Sprintf("%d", i), "result", nil)
|
||||
encoders = append(encoders, encoder)
|
||||
}
|
||||
|
||||
numWorkers := 1000
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(numWorkers)
|
||||
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
object.SetSelfLink("selfLink")
|
||||
buffer := bytes.NewBuffer(nil)
|
||||
for _, encoder := range encoders {
|
||||
buffer.Reset()
|
||||
if err := object.CacheEncode(encoder.identifier, encoder.encode, buffer); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if callsNumber := atomic.LoadInt32(&encoder.callsNumber); callsNumber != 1 {
|
||||
t.Errorf("unexpected number of serializations: %d", callsNumber)
|
||||
}
|
||||
}
|
||||
accessor, err := meta.Accessor(object.GetObject())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get accessor: %v", err)
|
||||
}
|
||||
if selfLink := accessor.GetSelfLink(); selfLink != "selfLink" {
|
||||
t.Errorf("unexpected selfLink: %s", selfLink)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
@ -206,6 +206,37 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob
|
|||
return object, resourceVersion, nil
|
||||
}
|
||||
|
||||
func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
|
||||
switch event.Type {
|
||||
case watch.Added, watch.Modified:
|
||||
if object, err := newCachingObject(event.Object); err == nil {
|
||||
event.Object = object
|
||||
} else {
|
||||
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
|
||||
}
|
||||
// Don't wrap PrevObject for update event (for create events it is nil).
|
||||
// We only encode those to deliver DELETE watch events, so if
|
||||
// event.Object is not nil it can be used only for watchers for which
|
||||
// selector was satisfied for its previous version and is no longer
|
||||
// satisfied for the current version.
|
||||
// This is rare enough that it doesn't justify making deep-copy of the
|
||||
// object (done by newCachingObject) every time.
|
||||
case watch.Deleted:
|
||||
// Don't wrap Object for delete events - these are not to deliver any
|
||||
// events. Only wrap PrevObject.
|
||||
if object, err := newCachingObject(event.PrevObject); err == nil {
|
||||
// Update resource version of the underlying object.
|
||||
// event.PrevObject is used to deliver DELETE watch events and
|
||||
// for them, we set resourceVersion to <current> instead of
|
||||
// the resourceVersion of the last modification of the object.
|
||||
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
|
||||
event.PrevObject = object
|
||||
} else {
|
||||
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processEvent is safe as long as there is at most one call to it in flight
|
||||
// at any point in time.
|
||||
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
|
||||
|
|
@ -219,7 +250,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||
return err
|
||||
}
|
||||
|
||||
watchCacheEvent := &watchCacheEvent{
|
||||
wcEvent := &watchCacheEvent{
|
||||
Type: event.Type,
|
||||
Object: elem.Object,
|
||||
ObjLabels: elem.Labels,
|
||||
|
|
@ -242,12 +273,12 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||
}
|
||||
if exists {
|
||||
previousElem := previous.(*storeElement)
|
||||
watchCacheEvent.PrevObject = previousElem.Object
|
||||
watchCacheEvent.PrevObjLabels = previousElem.Labels
|
||||
watchCacheEvent.PrevObjFields = previousElem.Fields
|
||||
wcEvent.PrevObject = previousElem.Object
|
||||
wcEvent.PrevObjLabels = previousElem.Labels
|
||||
wcEvent.PrevObjFields = previousElem.Fields
|
||||
}
|
||||
|
||||
w.updateCache(watchCacheEvent)
|
||||
w.updateCache(wcEvent)
|
||||
w.resourceVersion = resourceVersion
|
||||
defer w.cond.Broadcast()
|
||||
|
||||
|
|
@ -260,7 +291,18 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||
// This is safe as long as there is at most one call to processEvent in flight
|
||||
// at any point in time.
|
||||
if w.eventHandler != nil {
|
||||
w.eventHandler(watchCacheEvent)
|
||||
// Set up caching of object serializations only for dispatching this event.
|
||||
//
|
||||
// Storing serializations in memory would result in increased memory usage,
|
||||
// but it would help for caching encodings for watches started from old
|
||||
// versions. However, we still don't have a convincing data that the gain
|
||||
// from it justifies increased memory usage, so for now we drop the cached
|
||||
// serializations after dispatching this event.
|
||||
|
||||
// Make a shallow copy to allow overwriting Object and PrevObject.
|
||||
wce := *wcEvent
|
||||
setCachingObjects(&wce, w.versioner)
|
||||
w.eventHandler(&wce)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package cacher
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
|
@ -432,3 +433,53 @@ func TestReflectorForWatchCache(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCachingObjects(t *testing.T) {
|
||||
store := newTestWatchCache(5)
|
||||
|
||||
index := 0
|
||||
store.eventHandler = func(event *watchCacheEvent) {
|
||||
switch event.Type {
|
||||
case watch.Added, watch.Modified:
|
||||
if _, ok := event.Object.(runtime.CacheableObject); !ok {
|
||||
t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object)
|
||||
}
|
||||
if _, ok := event.PrevObject.(runtime.CacheableObject); ok {
|
||||
t.Fatalf("PrevObject in %s event should not support caching: %#v", event.Type, event.Object)
|
||||
}
|
||||
case watch.Deleted:
|
||||
if _, ok := event.Object.(runtime.CacheableObject); ok {
|
||||
t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object)
|
||||
}
|
||||
if _, ok := event.PrevObject.(runtime.CacheableObject); !ok {
|
||||
t.Fatalf("PrevObject in %s event should support caching: %#v", event.Type, event.Object)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify that delivered event is the same as cached one modulo Object/PrevObject.
|
||||
switch event.Type {
|
||||
case watch.Added, watch.Modified:
|
||||
event.Object = event.Object.(runtime.CacheableObject).GetObject()
|
||||
case watch.Deleted:
|
||||
event.PrevObject = event.PrevObject.(runtime.CacheableObject).GetObject()
|
||||
// In events store in watchcache, we also don't update ResourceVersion.
|
||||
// So we need to ensure that we don't fail on it.
|
||||
resourceVersion, err := store.versioner.ObjectResourceVersion(store.cache[index].PrevObject)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse resource version: %v", err)
|
||||
}
|
||||
updateResourceVersionIfNeeded(event.PrevObject, store.versioner, resourceVersion)
|
||||
}
|
||||
if a, e := event, store.cache[index]; !reflect.DeepEqual(a, e) {
|
||||
t.Errorf("watchCacheEvent messed up: %#v, expected: %#v", a, e)
|
||||
}
|
||||
index++
|
||||
}
|
||||
|
||||
pod1 := makeTestPod("pod", 1)
|
||||
pod2 := makeTestPod("pod", 2)
|
||||
pod3 := makeTestPod("pod", 3)
|
||||
store.Add(pod1)
|
||||
store.Update(pod2)
|
||||
store.Delete(pod3)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -357,7 +357,11 @@ func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType
|
|||
t.Logf("(called from line %d)", line)
|
||||
t.Errorf("Expected: %s, got: %s", eventType, event.Type)
|
||||
}
|
||||
if e, a := eventObject, event.Object; !apiequality.Semantic.DeepDerivative(e, a) {
|
||||
object := event.Object
|
||||
if co, ok := object.(runtime.CacheableObject); ok {
|
||||
object = co.GetObject()
|
||||
}
|
||||
if e, a := eventObject, object; !apiequality.Semantic.DeepDerivative(e, a) {
|
||||
t.Logf("(called from line %d)", line)
|
||||
t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
|
||||
}
|
||||
|
|
@ -606,7 +610,11 @@ func TestStartingResourceVersion(t *testing.T) {
|
|||
|
||||
select {
|
||||
case e := <-watcher.ResultChan():
|
||||
pod := e.Object.(*example.Pod)
|
||||
object := e.Object
|
||||
if co, ok := object.(runtime.CacheableObject); ok {
|
||||
object = co.GetObject()
|
||||
}
|
||||
pod := object.(*example.Pod)
|
||||
podRV, err := v.ParseResourceVersion(pod.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
|
|
@ -725,7 +733,11 @@ func TestRandomWatchDeliver(t *testing.T) {
|
|||
if !ok {
|
||||
break
|
||||
}
|
||||
if a, e := event.Object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a {
|
||||
object := event.Object
|
||||
if co, ok := object.(runtime.CacheableObject); ok {
|
||||
object = co.GetObject()
|
||||
}
|
||||
if a, e := object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a {
|
||||
t.Errorf("Unexpected object watched: %s, expected %s", a, e)
|
||||
}
|
||||
watched++
|
||||
|
|
@ -911,7 +923,7 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
|
|||
pod := fmt.Sprintf("foo-%d", i)
|
||||
err := createPod(etcdStorage, makeTestPod(pod))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create pod %v", pod)
|
||||
t.Fatalf("failed to create pod %v: %v", pod, err)
|
||||
}
|
||||
time.Sleep(time.Second / 100)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue