Extract and unify cache bypass logic by creating a CacheProxy struct

Kubernetes-commit: 4a4fc9da801f299176c7200e66224aa79b1c0817
This commit is contained in:
Marek Siarkowicz 2024-12-31 14:04:00 +01:00 committed by Kubernetes Publisher
parent 6c470468a0
commit 74be087390
7 changed files with 281 additions and 219 deletions

View File

@ -79,7 +79,7 @@ func StorageWithCacher() generic.StorageDecorator {
})
}
return cacher, destroyFunc, nil
return cacherstorage.NewCacheProxy(cacher, s), destroyFunc, nil
}
}

View File

@ -2459,7 +2459,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
}
}
d := destroyFunc
s = cacher
s = cacherstorage.NewCacheProxy(cacher, s)
destroyFunc = func() {
cacher.Stop()
d()

View File

@ -339,10 +339,6 @@ type Cacher struct {
expiredBookmarkWatchers []*cacheWatcher
}
func (c *Cacher) RequestWatchProgress(ctx context.Context) error {
return c.storage.RequestWatchProgress(ctx)
}
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
@ -495,56 +491,13 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
}
}
// Versioner implements storage.Interface.
func (c *Cacher) Versioner() storage.Versioner {
return c.storage.Versioner()
}
// Create implements storage.Interface.
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Create(ctx, key, obj, out, ttl)
}
// Delete implements storage.Interface.
func (c *Cacher) Delete(
ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc, _ runtime.Object, opts storage.DeleteOptions) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
klog.Errorf("GetByKey returned error: %v", err)
} else if exists {
// DeepCopy the object since we modify resource version when serializing the
// current object.
currObj := elem.(*storeElement).Object.DeepCopyObject()
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj, opts)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil, opts)
}
type namespacedName struct {
namespace string
name string
}
// Watch implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
pred := opts.Predicate
// if the watch-list feature wasn't set and the resourceVersion is unset
// ensure that the rv from which the watch is being served, is the latest
// one. "latest" is ensured by serving the watch from
// the underlying storage.
//
// it should never happen due to our validation but let's just be super-safe here
// and disable sendingInitialEvents when the feature wasn't enabled
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
opts.SendInitialEvents = nil
}
// TODO: we should eventually get rid of this legacy case
if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts)
}
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return nil, err
@ -709,58 +662,17 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return watcher, nil
}
// Get implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
ctx, span := tracing.Start(ctx, "cacher.Get",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("resource-version", opts.ResourceVersion))
defer span.End(500 * time.Millisecond)
if opts.ResourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
span.AddEvent("About to Get from underlying storage")
return c.storage.Get(ctx, key, opts, objPtr)
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() {
// If Cache is not initialized, delegate Get requests to storage
// as described in https://kep.k8s.io/4568
span.AddEvent("About to Get from underlying storage - cache not initialized")
return c.storage.Get(ctx, key, opts, objPtr)
}
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
getRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if getRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
span.AddEvent("About to Get from underlying storage - cache not initialized and no resourceVersion set")
return c.storage.Get(ctx, key, opts, objPtr)
}
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
}
objVal, err := conversion.EnforcePtr(objPtr)
if err != nil {
return err
}
span.AddEvent("About to fetch object from cache")
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, getRV, key)
if err != nil {
return err
@ -853,32 +765,9 @@ type listResp struct {
}
// GetList implements storage.Interface
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error {
recursive := opts.Recursive
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
if shouldDelegateList(opts) {
return c.storage.GetList(ctx, key, opts, listObj)
}
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return err
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() && shouldDelegateListOnNotReadyCache(opts) {
// If Cacher is not initialized, delegate List requests to storage
// as described in https://kep.k8s.io/4568
return c.storage.GetList(ctx, key, opts, listObj)
}
} else {
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj)
}
}
// For recursive lists, we need to make sure the key ended with "/" so that we only
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only
@ -887,14 +776,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
if opts.Recursive && !strings.HasSuffix(key, "/") {
preparedKey += "/"
}
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
consistentRead := resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
if consistentRead {
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
if err != nil {
return err
}
}
ctx, span := tracing.Start(ctx, "cacher.GetList",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
@ -928,24 +809,9 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
}
resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
success := "true"
fallback := "false"
if err != nil {
if consistentRead {
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"
err = c.storage.GetList(ctx, key, opts, listObj)
}
if err != nil {
success = "false"
}
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
}
return err
}
if consistentRead {
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
}
span.AddEvent("Listed items from cache", attribute.Int("count", len(resp.Items)))
// store pointer of eligible objects,
// Why not directly put object in the items of listObj?
@ -995,37 +861,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
return nil
}
// GuaranteedUpdate implements storage.Interface.
func (c *Cacher) GuaranteedUpdate(
ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ runtime.Object) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
klog.Errorf("GetByKey returned error: %v", err)
} else if exists {
// DeepCopy the object since we modify resource version when serializing the
// current object.
currObj := elem.(*storeElement).Object.DeepCopyObject()
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, currObj)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil)
}
// Count implements storage.Interface.
func (c *Cacher) Count(pathPrefix string) (int64, error) {
return c.storage.Count(pathPrefix)
}
// ReadinessCheck implements storage.Interface.
func (c *Cacher) ReadinessCheck() error {
if !c.ready.check() {
return storage.ErrStorageNotReady
}
return nil
}
// baseObjectThreadUnsafe omits locking for cachingObject.
func baseObjectThreadUnsafe(object runtime.Object) runtime.Object {
if co, ok := object.(*cachingObject); ok {
@ -1502,6 +1337,10 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach
}
}
func (c *Cacher) Ready() bool {
return c.ready.check()
}
// errWatcher implements watch.Interface to return a single error
type errWatcher struct {
result chan watch.Event

View File

@ -455,12 +455,12 @@ func withNodeNameAndNamespaceIndex(options *setupOptions) {
}
}
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) {
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *CacheProxy, tearDownFunc) {
ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
return ctx, cacher, tearDown
}
func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *Cacher, *etcd3testing.EtcdTestServer, tearDownFunc) {
func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *CacheProxy, *etcd3testing.EtcdTestServer, tearDownFunc) {
setupOpts := setupOptions{}
opts = append([]setupOption{withDefaults}, opts...)
for _, opt := range opts {
@ -514,31 +514,31 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
}
}
return ctx, cacher, server, terminate
return ctx, NewCacheProxy(cacher, wrappedStorage), server, terminate
}
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
_, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.TODO()); err != nil {
if err := cacher.cacher.ready.wait(context.TODO()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
return &createWrapper{Cacher: cacher}, tearDown
return &createWrapper{CacheProxy: cacher}, tearDown
}
type createWrapper struct {
*Cacher
*CacheProxy
}
func (c *createWrapper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if err := c.Cacher.Create(ctx, key, obj, out, ttl); err != nil {
if err := c.CacheProxy.Create(ctx, key, obj, out, ttl); err != nil {
return err
}
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
currentObj := c.Cacher.newFunc()
err := c.Cacher.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj)
currentObj := c.CacheProxy.cacher.newFunc()
err := c.CacheProxy.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj)
if err != nil {
if storage.IsNotFound(err) {
return false, nil

View File

@ -78,7 +78,7 @@ func computePodKey(obj *example.Pod) string {
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
}
func compactStorage(c *Cacher, client *clientv3.Client) storagetesting.Compaction {
func compactStorage(c *CacheProxy, client *clientv3.Client) storagetesting.Compaction {
return func(ctx context.Context, t *testing.T, resourceVersion string) {
versioner := storage.APIObjectVersioner{}
rv, err := versioner.ParseResourceVersion(resourceVersion)
@ -86,37 +86,37 @@ func compactStorage(c *Cacher, client *clientv3.Client) storagetesting.Compactio
t.Fatal(err)
}
err = c.watchCache.waitUntilFreshAndBlock(context.TODO(), rv)
err = c.cacher.watchCache.waitUntilFreshAndBlock(context.TODO(), rv)
if err != nil {
t.Fatalf("WatchCache didn't caught up to RV: %v", rv)
}
c.watchCache.RUnlock()
c.cacher.watchCache.RUnlock()
c.watchCache.Lock()
defer c.watchCache.Unlock()
c.Lock()
defer c.Unlock()
c.cacher.watchCache.Lock()
defer c.cacher.watchCache.Unlock()
c.cacher.Lock()
defer c.cacher.Unlock()
if c.watchCache.resourceVersion < rv {
if c.cacher.watchCache.resourceVersion < rv {
t.Fatalf("Can't compact into a future version: %v", resourceVersion)
}
if len(c.watchers.allWatchers) > 0 || len(c.watchers.valueWatchers) > 0 {
if len(c.cacher.watchers.allWatchers) > 0 || len(c.cacher.watchers.valueWatchers) > 0 {
// We could consider terminating those watchers, but given
// watchcache doesn't really support compaction and we don't
// exercise it in tests, we just throw an error here.
t.Error("Open watchers are not supported during compaction")
}
for c.watchCache.startIndex < c.watchCache.endIndex {
index := c.watchCache.startIndex % c.watchCache.capacity
if c.watchCache.cache[index].ResourceVersion > rv {
for c.cacher.watchCache.startIndex < c.cacher.watchCache.endIndex {
index := c.cacher.watchCache.startIndex % c.cacher.watchCache.capacity
if c.cacher.watchCache.cache[index].ResourceVersion > rv {
break
}
c.watchCache.startIndex++
c.cacher.watchCache.startIndex++
}
c.watchCache.listResourceVersion = rv
c.cacher.watchCache.listResourceVersion = rv
if _, err = client.KV.Put(ctx, "compact_rev_key", resourceVersion); err != nil {
t.Fatalf("Could not update compact_rev_key: %v", err)

View File

@ -310,15 +310,13 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
proxy := NewCacheProxy(cacher, backingStorage)
result := &example.PodList{}
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
currentResourceVersion := "42"
@ -337,11 +335,11 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
return nil
}
}
err = cacher.GetList(context.TODO(), "pods/ns", options, result)
if err != nil && err != errDummy {
err = proxy.GetList(context.TODO(), "pods/ns", options, result)
gotBypass := errors.Is(err, errDummy)
if err != nil && !gotBypass {
t.Fatalf("Unexpected error for List request with options: %v, err: %v", options, err)
}
gotBypass := err == errDummy
if gotBypass != expectBypass {
t.Errorf("Unexpected bypass result for List request with options %+v, bypass expected: %v, got: %v", options, expectBypass, gotBypass)
}
@ -436,6 +434,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
proxy := NewCacheProxy(cacher, backingStorage)
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
@ -461,7 +460,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
}
result := &example.PodList{}
start := cacher.clock.Now()
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result)
err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result)
duration := cacher.clock.Since(start)
if (err != nil) != tc.expectError {
t.Fatalf("Unexpected error err: %v", err)
@ -492,6 +491,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
proxy := NewCacheProxy(cacher, backingStorage)
pred := storage.SelectionPredicate{
Limit: 500,
@ -506,7 +506,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "0",
Predicate: pred,
}, result)
@ -514,11 +514,11 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
t.Errorf("GetList with Limit and RV=0 should be served from cache: %v", err)
}
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: pred,
}, result)
if err != errDummy {
if !errors.Is(err, errDummy) {
t.Errorf("GetList with Limit without RV=0 should bypass cacher: %v", err)
}
}
@ -530,6 +530,7 @@ func TestGetCacheBypass(t *testing.T) {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
proxy := NewCacheProxy(cacher, backingStorage)
result := &example.Pod{}
@ -541,7 +542,7 @@ func TestGetCacheBypass(t *testing.T) {
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
IgnoreNotFound: true,
ResourceVersion: "0",
}, result)
@ -549,11 +550,11 @@ func TestGetCacheBypass(t *testing.T) {
t.Errorf("Get with RV=0 should be served from cache: %v", err)
}
err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
IgnoreNotFound: true,
ResourceVersion: "",
}, result)
if err != errDummy {
if !errors.Is(err, errDummy) {
t.Errorf("Get without RV=0 should bypass cacher: %v", err)
}
}
@ -565,6 +566,7 @@ func TestWatchCacheBypass(t *testing.T) {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
proxy := NewCacheProxy(cacher, backingStorage)
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
@ -572,7 +574,7 @@ func TestWatchCacheBypass(t *testing.T) {
}
}
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
_, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "0",
Predicate: storage.Everything,
})
@ -580,7 +582,7 @@ func TestWatchCacheBypass(t *testing.T) {
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
}
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
_, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
@ -589,7 +591,7 @@ func TestWatchCacheBypass(t *testing.T) {
}
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
_, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
@ -600,7 +602,7 @@ func TestWatchCacheBypass(t *testing.T) {
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
_, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
@ -621,6 +623,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
proxy := NewCacheProxy(cacher, backingStorage)
opts := storage.ListOptions{
ResourceVersion: "0",
@ -632,7 +635,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
defer listCancel()
result := &example.PodList{}
err = cacher.GetList(listCtx, "/pods/ns", opts, result)
err = proxy.GetList(listCtx, "/pods/ns", opts, result)
if err != nil && apierrors.IsTooManyRequests(err) {
t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for List")
}
@ -640,7 +643,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
watchCtx, watchCancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer watchCancel()
_, err = cacher.Watch(watchCtx, "/pods/ns", opts)
_, err = proxy.Watch(watchCtx, "/pods/ns", opts)
if err != nil && apierrors.IsTooManyRequests(err) {
t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for Watch")
}
@ -865,6 +868,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
proxy := NewCacheProxy(cacher, backingStorage)
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
@ -872,7 +876,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
}
}
w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
w, err := proxy.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
@ -892,13 +896,13 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
cacher.Stop()
_, err = cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
_, err = proxy.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err == nil {
t.Fatalf("Success to create Watch: %v", err)
}
result := &example.Pod{}
err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
IgnoreNotFound: true,
ResourceVersion: "1",
}, result)
@ -913,7 +917,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
}
listResult := &example.PodList{}
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "1",
Recursive: true,
Predicate: storage.SelectionPredicate{
@ -2274,11 +2278,13 @@ func BenchmarkCacher_GetList(b *testing.B) {
}
// build test cacher
cacher, _, err := newTestCacher(newObjectStorage(fakePods))
store := newObjectStorage(fakePods)
cacher, _, err := newTestCacher(store)
if err != nil {
b.Fatalf("new cacher: %v", err)
}
defer cacher.Stop()
proxy := NewCacheProxy(cacher, store)
// prepare result and pred
parsedField, err := fields.ParseSelector("spec.nodeName=node-0")
@ -2294,7 +2300,7 @@ func BenchmarkCacher_GetList(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
result := &example.PodList{}
err = cacher.GetList(context.TODO(), "pods", storage.ListOptions{
err = proxy.GetList(context.TODO(), "pods", storage.ListOptions{
Predicate: pred,
Recursive: true,
ResourceVersion: "12345",
@ -2847,11 +2853,11 @@ func TestWatchStreamSeparation(t *testing.T) {
waitForEtcdBookmark := watchAndWaitForBookmark(t, waitContext, cacher.storage)
var out example.Pod
err = cacher.Create(context.Background(), "foo", &example.Pod{}, &out, 0)
err = cacher.storage.Create(context.Background(), "foo", &example.Pod{}, &out, 0)
if err != nil {
t.Fatal(err)
}
err = cacher.Delete(context.Background(), "foo", &out, nil, storage.ValidateAllObjectFunc, &example.Pod{}, storage.DeleteOptions{})
err = cacher.storage.Delete(context.Background(), "foo", &out, nil, storage.ValidateAllObjectFunc, &example.Pod{}, storage.DeleteOptions{})
if err != nil {
t.Fatal(err)
}
@ -3132,7 +3138,7 @@ func TestListIndexer(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pred := storagetesting.CreatePodPredicate(tt.fieldSelector, true, tt.indexFields)
_, usedIndex, err := cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive)
_, usedIndex, err := cacher.cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

217
pkg/storage/cacher/proxy.go Normal file
View File

@ -0,0 +1,217 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
)
func NewCacheProxy(cacher *Cacher, storage storage.Interface) *CacheProxy {
return &CacheProxy{
cacher: cacher,
storage: storage,
}
}
type CacheProxy struct {
cacher *Cacher
storage storage.Interface
}
var _ storage.Interface = (*CacheProxy)(nil)
func (c *CacheProxy) Versioner() storage.Versioner {
return c.storage.Versioner()
}
func (c *CacheProxy) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Create(ctx, key, obj, out, ttl)
}
func (c *CacheProxy) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.cacher.watchCache.GetByKey(key); err != nil {
klog.Errorf("GetByKey returned error: %v", err)
} else if exists {
// DeepCopy the object since we modify resource version when serializing the
// current object.
currObj := elem.(*storeElement).Object.DeepCopyObject()
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj, opts)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil, opts)
}
func (c *CacheProxy) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
// if the watch-list feature wasn't set and the resourceVersion is unset
// ensure that the rv from which the watch is being served, is the latest
// one. "latest" is ensured by serving the watch from
// the underlying storage.
//
// it should never happen due to our validation but let's just be super-safe here
// and disable sendingInitialEvents when the feature wasn't enabled
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
opts.SendInitialEvents = nil
}
// TODO: we should eventually get rid of this legacy case
if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts)
}
return c.cacher.Watch(ctx, key, opts)
}
func (c *CacheProxy) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
ctx, span := tracing.Start(ctx, "cacher.Get",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("resource-version", opts.ResourceVersion))
defer span.End(500 * time.Millisecond)
if opts.ResourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
span.AddEvent("About to Get from underlying storage")
return c.storage.Get(ctx, key, opts, objPtr)
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.cacher.Ready() {
// If Cache is not initialized, delegate Get requests to storage
// as described in https://kep.k8s.io/4568
span.AddEvent("About to Get from underlying storage - cache not initialized")
return c.storage.Get(ctx, key, opts, objPtr)
}
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
getRV, err := c.cacher.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if getRV == 0 && !c.cacher.Ready() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.Get(ctx, key, opts, objPtr)
}
if err := c.cacher.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
}
span.AddEvent("About to fetch object from cache")
return c.cacher.Get(ctx, key, opts, objPtr)
}
func (c *CacheProxy) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
if shouldDelegateList(opts) {
return c.storage.GetList(ctx, key, opts, listObj)
}
listRV, err := c.cacher.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.cacher.Ready() && shouldDelegateListOnNotReadyCache(opts) {
// If Cacher is not initialized, delegate List requests to storage
// as described in https://kep.k8s.io/4568
return c.storage.GetList(ctx, key, opts, listObj)
}
} else {
if listRV == 0 && !c.cacher.Ready() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj)
}
}
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
consistentRead := opts.ResourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
if consistentRead {
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.cacher.newListFunc, c.cacher.resourcePrefix, c.cacher.objectType.String())
if err != nil {
return err
}
}
err = c.cacher.GetList(ctx, key, opts, listObj, listRV)
success := "true"
fallback := "false"
if err != nil {
if consistentRead {
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"
err = c.storage.GetList(ctx, key, opts, listObj)
}
if err != nil {
success = "false"
}
metrics.ConsistentReadTotal.WithLabelValues(c.cacher.resourcePrefix, success, fallback).Add(1)
}
return err
}
if consistentRead {
metrics.ConsistentReadTotal.WithLabelValues(c.cacher.resourcePrefix, success, fallback).Add(1)
}
return nil
}
func (c *CacheProxy) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.cacher.watchCache.GetByKey(key); err != nil {
klog.Errorf("GetByKey returned error: %v", err)
} else if exists {
// DeepCopy the object since we modify resource version when serializing the
// current object.
currObj := elem.(*storeElement).Object.DeepCopyObject()
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, currObj)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil)
}
func (c *CacheProxy) Count(pathPrefix string) (int64, error) {
return c.storage.Count(pathPrefix)
}
func (c *CacheProxy) ReadinessCheck() error {
if !c.cacher.Ready() {
return storage.ErrStorageNotReady
}
return nil
}
func (c *CacheProxy) RequestWatchProgress(ctx context.Context) error {
return c.storage.RequestWatchProgress(ctx)
}