apiserver/pkg/storage/cacher/caching_object.go

413 lines
13 KiB
Go

/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"bytes"
"fmt"
"io"
"reflect"
"runtime/debug"
"sync"
"sync/atomic"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
var _ runtime.CacheableObject = &cachingObject{}
// metaRuntimeInterface implements runtime.Object and
// metav1.Object interfaces.
type metaRuntimeInterface interface {
runtime.Object
metav1.Object
}
// serializationResult captures a result of serialization.
type serializationResult struct {
// once should be used to ensure serialization is computed once.
once sync.Once
// raw is serialized object.
raw []byte
// err is error from serialization.
err error
}
// serializationsCache is a type for caching serialization results.
type serializationsCache map[runtime.Identifier]*serializationResult
// cachingObject is an object that is able to cache its serializations
// so that each of those is computed exactly once.
//
// cachingObject implements the metav1.Object interface (accessors for
// all metadata fields).
type cachingObject struct {
lock sync.RWMutex
// deepCopied defines whether the object below has already been
// deep copied. The operation is performed lazily on the first
// setXxx operation.
//
// The lazy deep-copy make is useful, as effectively the only
// case when we are setting some fields are ResourceVersion for
// DELETE events, so in all other cases we can effectively avoid
// performing any deep copies.
deepCopied bool
// Object for which serializations are cached.
object metaRuntimeInterface
// serializations is a cache containing object`s serializations.
// The value stored in atomic.Value is of type serializationsCache.
// The atomic.Value type is used to allow fast-path.
serializations atomic.Value
}
// newCachingObject performs a deep copy of the given object and wraps it
// into a cachingObject.
// An error is returned if it's not possible to cast the object to
// metav1.Object type.
func newCachingObject(object runtime.Object) (*cachingObject, error) {
if obj, ok := object.(metaRuntimeInterface); ok {
result := &cachingObject{
object: obj,
deepCopied: false,
}
result.serializations.Store(make(serializationsCache))
return result, nil
}
return nil, fmt.Errorf("can't cast object to metav1.Object: %#v", object)
}
func (o *cachingObject) getSerializationResult(id runtime.Identifier) *serializationResult {
// Fast-path for getting from cache.
serializations := o.serializations.Load().(serializationsCache)
if result, exists := serializations[id]; exists {
return result
}
// Slow-path (that may require insert).
o.lock.Lock()
defer o.lock.Unlock()
serializations = o.serializations.Load().(serializationsCache)
// Check if in the meantime it wasn't inserted.
if result, exists := serializations[id]; exists {
return result
}
// Insert an entry for <id>. This requires copy of existing map.
newSerializations := make(serializationsCache)
for k, v := range serializations {
newSerializations[k] = v
}
result := &serializationResult{}
newSerializations[id] = result
o.serializations.Store(newSerializations)
return result
}
// CacheEncode implements runtime.CacheableObject interface.
// It serializes the object and writes the result to given io.Writer trying
// to first use the already cached result and falls back to a given encode
// function in case of cache miss.
// It assumes that for a given identifier, the encode function always encodes
// each input object into the same output format.
func (o *cachingObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error {
result := o.getSerializationResult(id)
result.once.Do(func() {
buffer := bytes.NewBuffer(nil)
// TODO(wojtek-t): This is currently making a copy to avoid races
// in cases where encoding is making subtle object modifications,
// e.g. #82497
// Figure out if we can somehow avoid this under some conditions.
result.err = encode(o.GetObject(), buffer)
result.raw = buffer.Bytes()
})
// Once invoked, fields of serialization will not change.
if result.err != nil {
return result.err
}
if b, support := w.(runtime.Splice); support {
b.Splice(result.raw)
return nil
}
_, err := w.Write(result.raw)
return err
}
// GetObject implements runtime.CacheableObject interface.
// It returns deep-copy of the wrapped object to return ownership of it
// to the called according to the contract of the interface.
func (o *cachingObject) GetObject() runtime.Object {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.DeepCopyObject().(metaRuntimeInterface)
}
// GetObjectKind implements runtime.Object interface.
func (o *cachingObject) GetObjectKind() schema.ObjectKind {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetObjectKind()
}
// DeepCopyObject implements runtime.Object interface.
func (o *cachingObject) DeepCopyObject() runtime.Object {
// DeepCopyObject on cachingObject is not expected to be called anywhere.
// However, to be on the safe-side, we implement it, though given the
// cache is only an optimization we ignore copying it.
result := &cachingObject{
deepCopied: true,
}
result.serializations.Store(make(serializationsCache))
o.lock.RLock()
defer o.lock.RUnlock()
result.object = o.object.DeepCopyObject().(metaRuntimeInterface)
return result
}
var (
invalidationCacheTimestampLock sync.Mutex
invalidationCacheTimestamp time.Time
)
// shouldLogCacheInvalidation allows for logging cache-invalidation
// at most once per second (to avoid spamming logs in case of issues).
func shouldLogCacheInvalidation(now time.Time) bool {
invalidationCacheTimestampLock.Lock()
defer invalidationCacheTimestampLock.Unlock()
if invalidationCacheTimestamp.Add(time.Second).Before(now) {
invalidationCacheTimestamp = now
return true
}
return false
}
func (o *cachingObject) invalidateCacheLocked() {
if cache, ok := o.serializations.Load().(serializationsCache); ok && len(cache) == 0 {
return
}
// We don't expect cache invalidation to happen - so we want
// to log the stacktrace to allow debugging if that will happen.
// OTOH, we don't want to spam logs with it.
// So we try to log it at most once per second.
if shouldLogCacheInvalidation(time.Now()) {
klog.Warningf("Unexpected cache invalidation for %#v\n%s", o.object, string(debug.Stack()))
}
o.serializations.Store(make(serializationsCache))
}
// The following functions implement metav1.Object interface:
// - getters simply delegate for the underlying object
// - setters check if operations isn't noop and if so,
// invalidate the cache and delegate for the underlying object
func (o *cachingObject) conditionalSet(isNoop func() bool, set func()) {
if fastPath := func() bool {
o.lock.RLock()
defer o.lock.RUnlock()
return isNoop()
}(); fastPath {
return
}
o.lock.Lock()
defer o.lock.Unlock()
if isNoop() {
return
}
if !o.deepCopied {
o.object = o.object.DeepCopyObject().(metaRuntimeInterface)
o.deepCopied = true
}
o.invalidateCacheLocked()
set()
}
func (o *cachingObject) GetNamespace() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetNamespace()
}
func (o *cachingObject) SetNamespace(namespace string) {
o.conditionalSet(
func() bool { return o.object.GetNamespace() == namespace },
func() { o.object.SetNamespace(namespace) },
)
}
func (o *cachingObject) GetName() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetName()
}
func (o *cachingObject) SetName(name string) {
o.conditionalSet(
func() bool { return o.object.GetName() == name },
func() { o.object.SetName(name) },
)
}
func (o *cachingObject) GetGenerateName() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetGenerateName()
}
func (o *cachingObject) SetGenerateName(name string) {
o.conditionalSet(
func() bool { return o.object.GetGenerateName() == name },
func() { o.object.SetGenerateName(name) },
)
}
func (o *cachingObject) GetUID() types.UID {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetUID()
}
func (o *cachingObject) SetUID(uid types.UID) {
o.conditionalSet(
func() bool { return o.object.GetUID() == uid },
func() { o.object.SetUID(uid) },
)
}
func (o *cachingObject) GetResourceVersion() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetResourceVersion()
}
func (o *cachingObject) SetResourceVersion(version string) {
o.conditionalSet(
func() bool { return o.object.GetResourceVersion() == version },
func() { o.object.SetResourceVersion(version) },
)
}
func (o *cachingObject) GetGeneration() int64 {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetGeneration()
}
func (o *cachingObject) SetGeneration(generation int64) {
o.conditionalSet(
func() bool { return o.object.GetGeneration() == generation },
func() { o.object.SetGeneration(generation) },
)
}
func (o *cachingObject) GetSelfLink() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetSelfLink()
}
func (o *cachingObject) SetSelfLink(selfLink string) {
o.conditionalSet(
func() bool { return o.object.GetSelfLink() == selfLink },
func() { o.object.SetSelfLink(selfLink) },
)
}
func (o *cachingObject) GetCreationTimestamp() metav1.Time {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetCreationTimestamp()
}
func (o *cachingObject) SetCreationTimestamp(timestamp metav1.Time) {
o.conditionalSet(
func() bool { return o.object.GetCreationTimestamp() == timestamp },
func() { o.object.SetCreationTimestamp(timestamp) },
)
}
func (o *cachingObject) GetDeletionTimestamp() *metav1.Time {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetDeletionTimestamp()
}
func (o *cachingObject) SetDeletionTimestamp(timestamp *metav1.Time) {
o.conditionalSet(
func() bool { return o.object.GetDeletionTimestamp() == timestamp },
func() { o.object.SetDeletionTimestamp(timestamp) },
)
}
func (o *cachingObject) GetDeletionGracePeriodSeconds() *int64 {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetDeletionGracePeriodSeconds()
}
func (o *cachingObject) SetDeletionGracePeriodSeconds(gracePeriodSeconds *int64) {
o.conditionalSet(
func() bool { return o.object.GetDeletionGracePeriodSeconds() == gracePeriodSeconds },
func() { o.object.SetDeletionGracePeriodSeconds(gracePeriodSeconds) },
)
}
func (o *cachingObject) GetLabels() map[string]string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetLabels()
}
func (o *cachingObject) SetLabels(labels map[string]string) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetLabels(), labels) },
func() { o.object.SetLabels(labels) },
)
}
func (o *cachingObject) GetAnnotations() map[string]string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetAnnotations()
}
func (o *cachingObject) SetAnnotations(annotations map[string]string) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetAnnotations(), annotations) },
func() { o.object.SetAnnotations(annotations) },
)
}
func (o *cachingObject) GetFinalizers() []string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetFinalizers()
}
func (o *cachingObject) SetFinalizers(finalizers []string) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetFinalizers(), finalizers) },
func() { o.object.SetFinalizers(finalizers) },
)
}
func (o *cachingObject) GetOwnerReferences() []metav1.OwnerReference {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetOwnerReferences()
}
func (o *cachingObject) SetOwnerReferences(references []metav1.OwnerReference) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetOwnerReferences(), references) },
func() { o.object.SetOwnerReferences(references) },
)
}
func (o *cachingObject) GetManagedFields() []metav1.ManagedFieldsEntry {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetManagedFields()
}
func (o *cachingObject) SetManagedFields(managedFields []metav1.ManagedFieldsEntry) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetManagedFields(), managedFields) },
func() { o.object.SetManagedFields(managedFields) },
)
}