Merge pull request #130475 from serathius/watchcache-consistency

Implement consistency checking

Kubernetes-commit: 4c311c9fcf6a67c665127d67fb30cd602ba5b88d
This commit is contained in:
Kubernetes Publisher 2025-03-11 05:09:46 -07:00
commit b2bc62b37f
10 changed files with 438 additions and 13 deletions

2
go.mod
View File

@ -49,7 +49,7 @@ require (
gopkg.in/go-jose/go-jose.v2 v2.6.3
gopkg.in/natefinch/lumberjack.v2 v2.2.1
k8s.io/api v0.0.0-20250309133059-a634b5da20f8
k8s.io/apimachinery v0.0.0-20250311012723-56015c7e0a4c
k8s.io/apimachinery v0.0.0-20250311092730-6e3d6ca42453
k8s.io/client-go v0.0.0-20250309133502-a50f4a61f1fe
k8s.io/component-base v0.0.0-20250309134709-8856f5121e0f
k8s.io/klog/v2 v2.130.1

4
go.sum
View File

@ -369,8 +369,8 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20250309133059-a634b5da20f8 h1:jiMSGVqkId+uu7ZytgmVIKmi0JpSxEnvLpEWREXANF0=
k8s.io/api v0.0.0-20250309133059-a634b5da20f8/go.mod h1:PInlXJ1egb20DkfXa0bXGEDngGOImO/YqhnFd5AI3GA=
k8s.io/apimachinery v0.0.0-20250311012723-56015c7e0a4c h1:he1Z3jPl/VLtSc0TPJX0nVrZ15/fd8Ri87Nh4Q4jqjw=
k8s.io/apimachinery v0.0.0-20250311012723-56015c7e0a4c/go.mod h1:S2OIkExGqJOXYSYcAJwQ9zWcc6BkBUdTJUu4M7z0cvo=
k8s.io/apimachinery v0.0.0-20250311092730-6e3d6ca42453 h1:OHdFWi18BE714KkhhYcPoYVPHfF86ACb1d4zveVAHZs=
k8s.io/apimachinery v0.0.0-20250311092730-6e3d6ca42453/go.mod h1:S2OIkExGqJOXYSYcAJwQ9zWcc6BkBUdTJUu4M7z0cvo=
k8s.io/client-go v0.0.0-20250309133502-a50f4a61f1fe h1:z8Vy9OfF439ZSdA2VAkvJuwwmQRMViVFBUCMmwOTjYY=
k8s.io/client-go v0.0.0-20250309133502-a50f4a61f1fe/go.mod h1:upRE5ReMBvvnk8vF0rotPqiFmZibsjm6DQleUoG0L8M=
k8s.io/component-base v0.0.0-20250309134709-8856f5121e0f h1:p5iwVOxDbA+veZGM1SompAZtmmEl4hfSGtoVA1ZvwC4=

View File

@ -71,15 +71,17 @@ func StorageWithCacher() generic.StorageDecorator {
if err != nil {
return nil, func() {}, err
}
delegator := cacherstorage.NewCacheDelegator(cacher, s)
var once sync.Once
destroyFunc := func() {
once.Do(func() {
delegator.Stop()
cacher.Stop()
d()
})
}
return cacherstorage.NewCacheDelegator(cacher, s), destroyFunc, nil
return delegator, destroyFunc, nil
}
}

View File

@ -2459,8 +2459,10 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
}
}
d := destroyFunc
s = cacherstorage.NewCacheDelegator(cacher, s)
delegator := cacherstorage.NewCacheDelegator(cacher, s)
s = delegator
destroyFunc = func() {
delegator.Stop()
cacher.Stop()
d()
}

View File

@ -729,7 +729,7 @@ type listResp struct {
}
// GetList implements storage.Interface
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error {
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
// For recursive lists, we need to make sure the key ended with "/" so that we only
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only
@ -738,6 +738,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
if opts.Recursive && !strings.HasSuffix(key, "/") {
preparedKey += "/"
}
listRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
ctx, span := tracing.Start(ctx, "cacher.GetList",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),

View File

@ -484,10 +484,6 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
t.Fatalf("Failed to initialize cacher: %v", err)
}
ctx := context.Background()
terminate := func() {
cacher.Stop()
server.Terminate(t)
}
// Since some tests depend on the fact that GetList shouldn't fail,
// we wait until the error from the underlying storage is consumed.
@ -503,8 +499,14 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
t.Fatal(err)
}
}
delegator := NewCacheDelegator(cacher, wrappedStorage)
terminate := func() {
delegator.Stop()
cacher.Stop()
server.Terminate(t)
}
return ctx, NewCacheDelegator(cacher, wrappedStorage), server, terminate
return ctx, delegator, server, terminate
}
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {

View File

@ -207,6 +207,15 @@ func (d *dummyStorage) GetCurrentResourceVersion(ctx context.Context) (uint64, e
return 100, nil
}
type dummyCacher struct {
dummyStorage
ready bool
}
func (d *dummyCacher) Ready() bool {
return d.ready
}
func TestGetListCacheBypass(t *testing.T) {
type opts struct {
ResourceVersion string
@ -326,6 +335,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
result := &example.PodList{}
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
@ -450,6 +460,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
@ -533,6 +544,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
pred := storage.SelectionPredicate{
Limit: 500,
@ -572,6 +584,7 @@ func TestGetCacheBypass(t *testing.T) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
result := &example.Pod{}
@ -608,6 +621,7 @@ func TestWatchCacheBypass(t *testing.T) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
@ -645,6 +659,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
opts := storage.ListOptions{
ResourceVersion: "0",
@ -890,6 +905,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
t.Fatalf("Couldn't create cacher: %v", err)
}
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
@ -2326,6 +2342,7 @@ func BenchmarkCacher_GetList(b *testing.B) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, store)
defer delegator.Stop()
// prepare result and pred
parsedField, err := fields.ParseSelector("spec.nodeName=node-0")
@ -3207,6 +3224,7 @@ func TestRetryAfterForUnreadyCache(t *testing.T) {
}
result := &example.PodList{}
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
err = delegator.GetList(context.TODO(), "/pods/ns", opts, result)
if !apierrors.IsTooManyRequests(err) {

View File

@ -18,12 +18,20 @@ package cacher
import (
"context"
"fmt"
"hash"
"hash/fnv"
"os"
"strconv"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/features"
@ -35,16 +43,45 @@ import (
"k8s.io/klog/v2"
)
var (
// ConsistencyCheckPeriod is the period of checking consistency between etcd and cache.
// 5 minutes were proposed to match the default compaction period. It's magnitute higher than
// List latency SLO (30 seconds) and timeout (1 minute).
ConsistencyCheckPeriod = 5 * time.Minute
// ConsistencyCheckerEnabled enables the consistency checking mechanism for cache.
// Based on KUBE_WATCHCACHE_CONSISTANCY_CHECKER environment variable.
ConsistencyCheckerEnabled = false
)
func init() {
ConsistencyCheckerEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHCACHE_CONSISTANCY_CHECKER"))
}
func NewCacheDelegator(cacher *Cacher, storage storage.Interface) *CacheDelegator {
return &CacheDelegator{
d := &CacheDelegator{
cacher: cacher,
storage: storage,
stopCh: make(chan struct{}),
}
if ConsistencyCheckerEnabled {
d.checker = newConsistencyChecker(cacher.resourcePrefix, cacher.newListFunc, cacher, storage)
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.checker.startChecking(d.stopCh)
}()
}
return d
}
type CacheDelegator struct {
cacher *Cacher
storage storage.Interface
checker *consistencyChecker
wg sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
}
var _ storage.Interface = (*CacheDelegator)(nil)
@ -168,14 +205,18 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
if err != nil {
return err
}
// Setting resource version for consistent read in cache based on current ResourceVersion in etcd.
opts.ResourceVersion = strconv.FormatInt(int64(listRV), 10)
}
err = c.cacher.GetList(ctx, key, opts, listObj, listRV)
err = c.cacher.GetList(ctx, key, opts, listObj)
success := "true"
fallback := "false"
if err != nil {
if consistentRead {
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"
// Reset resourceVersion during fallback from consistent read.
opts.ResourceVersion = ""
err = c.storage.GetList(ctx, key, opts, listObj)
}
if err != nil {
@ -258,3 +299,156 @@ func (c *CacheDelegator) ReadinessCheck() error {
func (c *CacheDelegator) RequestWatchProgress(ctx context.Context) error {
return c.storage.RequestWatchProgress(ctx)
}
func (c *CacheDelegator) Stop() {
c.stopOnce.Do(func() {
close(c.stopCh)
})
c.wg.Wait()
}
func newConsistencyChecker(resourcePrefix string, newListFunc func() runtime.Object, cacher getListerReady, etcd getLister) *consistencyChecker {
return &consistencyChecker{
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
cacher: cacher,
etcd: etcd,
}
}
type consistencyChecker struct {
resourcePrefix string
newListFunc func() runtime.Object
cacher getListerReady
etcd getLister
}
type getListerReady interface {
getLister
Ready() bool
}
type getLister interface {
GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error
}
func (c consistencyChecker) startChecking(stopCh <-chan struct{}) {
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), ConsistencyCheckPeriod, false, func(ctx context.Context) (done bool, err error) {
c.check(ctx)
return false, nil
})
if err != nil {
klog.InfoS("Cache consistency check exiting", "resource", c.resourcePrefix, "err", err)
}
}
func (c *consistencyChecker) check(ctx context.Context) {
digests, err := c.calculateDigests(ctx)
if err != nil {
klog.ErrorS(err, "Cache consistentency check error", "resource", c.resourcePrefix)
metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "error").Inc()
return
}
if digests.CacheDigest == digests.EtcdDigest {
klog.V(3).InfoS("Cache consistentency check passed", "resource", c.resourcePrefix, "resourceVersion", digests.ResourceVersion, "digest", digests.CacheDigest)
metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "success").Inc()
} else {
klog.ErrorS(nil, "Cache consistentency check failed", "resource", c.resourcePrefix, "resourceVersion", digests.ResourceVersion, "etcdDigest", digests.EtcdDigest, "cacheDigest", digests.CacheDigest)
metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "failure").Inc()
}
}
func (c *consistencyChecker) calculateDigests(ctx context.Context) (*storageDigest, error) {
if !c.cacher.Ready() {
return nil, fmt.Errorf("cache is not ready")
}
cacheDigest, resourceVersion, err := c.calculateStoreDigest(ctx, c.cacher, storage.ListOptions{
ResourceVersion: "0",
Predicate: storage.Everything,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
})
if err != nil {
return nil, fmt.Errorf("failed calculating cache digest: %w", err)
}
etcdDigest, _, err := c.calculateStoreDigest(ctx, c.etcd, storage.ListOptions{
ResourceVersion: resourceVersion,
Predicate: storage.Everything,
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
})
if err != nil {
return nil, fmt.Errorf("failed calculating etcd digest: %w", err)
}
return &storageDigest{
ResourceVersion: resourceVersion,
CacheDigest: cacheDigest,
EtcdDigest: etcdDigest,
}, nil
}
type storageDigest struct {
ResourceVersion string
CacheDigest string
EtcdDigest string
}
func (c *consistencyChecker) calculateStoreDigest(ctx context.Context, store getLister, opts storage.ListOptions) (digest, rv string, err error) {
// TODO: Implement pagination
resp := c.newListFunc()
err = store.GetList(ctx, c.resourcePrefix, opts, resp)
if err != nil {
return "", "", err
}
digest, err = listDigest(resp)
if err != nil {
return "", "", err
}
list, err := meta.ListAccessor(resp)
if err != nil {
return "", "", err
}
return digest, list.GetResourceVersion(), nil
}
func listDigest(list runtime.Object) (string, error) {
h := fnv.New64()
err := meta.EachListItem(list, func(obj runtime.Object) error {
objectMeta, err := meta.Accessor(obj)
if err != nil {
return err
}
err = addObjectToDigest(h, objectMeta)
if err != nil {
return err
}
return nil
})
if err != nil {
return "", err
}
return fmt.Sprintf("%x", h.Sum64()), nil
}
func addObjectToDigest(h hash.Hash64, objectMeta metav1.Object) error {
_, err := h.Write([]byte(objectMeta.GetNamespace()))
if err != nil {
return err
}
_, err = h.Write([]byte("/"))
if err != nil {
return err
}
_, err = h.Write([]byte(objectMeta.GetName()))
if err != nil {
return err
}
_, err = h.Write([]byte("/"))
if err != nil {
return err
}
_, err = h.Write([]byte(objectMeta.GetResourceVersion()))
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,194 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"context"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/storage"
)
func TestCalculateDigest(t *testing.T) {
newListFunc := func() runtime.Object { return &example.PodList{} }
testCases := []struct {
desc string
resourceVersion string
cacherReady bool
cacherItems []example.Pod
etcdItems []example.Pod
resourcePrefix string
expectListKey string
expectDigest storageDigest
expectErr bool
}{
{
desc: "not ready",
cacherReady: false,
resourceVersion: "1",
expectErr: true,
},
{
desc: "empty",
resourceVersion: "1",
cacherReady: true,
expectDigest: storageDigest{
ResourceVersion: "1",
CacheDigest: "cbf29ce484222325",
EtcdDigest: "cbf29ce484222325",
},
},
{
desc: "with one element equal",
resourceVersion: "2",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "2"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "2"}},
},
expectDigest: storageDigest{
ResourceVersion: "2",
CacheDigest: "86bf3a5e80d1c5cb",
EtcdDigest: "86bf3a5e80d1c5cb",
},
},
{
desc: "namespace changes digest",
resourceVersion: "2",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "kube-system", Name: "pod", ResourceVersion: "2"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "kube-public", Name: "pod", ResourceVersion: "2"}},
},
expectDigest: storageDigest{
ResourceVersion: "2",
CacheDigest: "4ae4e750bd825b17",
EtcdDigest: "f940a60af965b03",
},
},
{
desc: "name changes digest",
resourceVersion: "2",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod2", ResourceVersion: "2"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod3", ResourceVersion: "2"}},
},
expectDigest: storageDigest{
ResourceVersion: "2",
CacheDigest: "c9120494e4c1897d",
EtcdDigest: "c9156494e4c46274",
},
},
{
desc: "resourceVersion changes digest",
resourceVersion: "2",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "3"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "4"}},
},
expectDigest: storageDigest{
ResourceVersion: "2",
CacheDigest: "86bf3a5e80d1c5ca",
EtcdDigest: "86bf3a5e80d1c5cd",
},
},
{
desc: "watch missed write event",
resourceVersion: "3",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "2"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "2"}},
{ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "3"}},
},
expectDigest: storageDigest{
ResourceVersion: "3",
CacheDigest: "1859bac707c2cb2b",
EtcdDigest: "11d147fc800df0e0",
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
etcd := &dummyStorage{
getListFn: func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
if key != tc.expectListKey {
t.Fatalf("Expect GetList key %q, got %q", tc.expectListKey, key)
}
if opts.ResourceVersion != tc.resourceVersion {
t.Fatalf("Expect GetList resourceVersion %q, got %q", tc.resourceVersion, opts.ResourceVersion)
}
if opts.ResourceVersionMatch != metav1.ResourceVersionMatchExact {
t.Fatalf("Expect GetList match exact, got %q", opts.ResourceVersionMatch)
}
podList := listObj.(*example.PodList)
podList.Items = tc.etcdItems
podList.ResourceVersion = tc.resourceVersion
return nil
},
}
cacher := &dummyCacher{
ready: tc.cacherReady,
dummyStorage: dummyStorage{
getListFn: func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
if key != tc.expectListKey {
t.Fatalf("Expect GetList key %q, got %q", tc.expectListKey, key)
}
if opts.ResourceVersion != "0" {
t.Fatalf("Expect GetList resourceVersion 0, got %q", opts.ResourceVersion)
}
if opts.ResourceVersionMatch != metav1.ResourceVersionMatchNotOlderThan {
t.Fatalf("Expect GetList match not older than, got %q", opts.ResourceVersionMatch)
}
podList := listObj.(*example.PodList)
podList.Items = tc.cacherItems
podList.ResourceVersion = tc.resourceVersion
return nil
},
},
}
checker := newConsistencyChecker(tc.resourcePrefix, newListFunc, cacher, etcd)
digest, err := checker.calculateDigests(context.Background())
if (err != nil) != tc.expectErr {
t.Fatalf("Expect error: %v, got: %v", tc.expectErr, err)
}
if err != nil {
return
}
if *digest != tc.expectDigest {
t.Errorf("Expect: %+v Got: %+v", &tc.expectDigest, *digest)
}
})
}
}

View File

@ -176,6 +176,14 @@ var (
Help: "Counter for consistent reads from cache.",
StabilityLevel: compbasemetrics.ALPHA,
}, []string{"resource", "success", "fallback"})
StorageConsistencyCheckTotal = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Name: "storage_consistency_checks_total",
Help: "Counter for status of consistency checks between etcd and watch cache",
StabilityLevel: compbasemetrics.INTERNAL,
}, []string{"resource", "status"})
)
var registerMetrics sync.Once
@ -198,6 +206,7 @@ func Register() {
legacyregistry.MustRegister(WatchCacheInitializations)
legacyregistry.MustRegister(WatchCacheReadWait)
legacyregistry.MustRegister(ConsistentReadTotal)
legacyregistry.MustRegister(StorageConsistencyCheckTotal)
})
}