Implement consistency checking

Kubernetes-commit: e4d73c56cd055a6e3a23068bd70c424579df40fe
This commit is contained in:
Marek Siarkowicz 2025-02-27 17:53:06 +01:00 committed by Kubernetes Publisher
parent 67d2550df7
commit 23e9b2c9d4
8 changed files with 435 additions and 10 deletions

View File

@ -71,15 +71,17 @@ func StorageWithCacher() generic.StorageDecorator {
if err != nil {
return nil, func() {}, err
}
delegator := cacherstorage.NewCacheDelegator(cacher, s)
var once sync.Once
destroyFunc := func() {
once.Do(func() {
delegator.Stop()
cacher.Stop()
d()
})
}
return cacherstorage.NewCacheDelegator(cacher, s), destroyFunc, nil
return delegator, destroyFunc, nil
}
}

View File

@ -2459,8 +2459,10 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
}
}
d := destroyFunc
s = cacherstorage.NewCacheDelegator(cacher, s)
delegator := cacherstorage.NewCacheDelegator(cacher, s)
s = delegator
destroyFunc = func() {
delegator.Stop()
cacher.Stop()
d()
}

View File

@ -729,7 +729,7 @@ type listResp struct {
}
// GetList implements storage.Interface
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error {
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
// For recursive lists, we need to make sure the key ended with "/" so that we only
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only
@ -738,6 +738,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
if opts.Recursive && !strings.HasSuffix(key, "/") {
preparedKey += "/"
}
listRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
ctx, span := tracing.Start(ctx, "cacher.GetList",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),

View File

@ -484,10 +484,6 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
t.Fatalf("Failed to initialize cacher: %v", err)
}
ctx := context.Background()
terminate := func() {
cacher.Stop()
server.Terminate(t)
}
// Since some tests depend on the fact that GetList shouldn't fail,
// we wait until the error from the underlying storage is consumed.
@ -503,8 +499,14 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
t.Fatal(err)
}
}
delegator := NewCacheDelegator(cacher, wrappedStorage)
terminate := func() {
delegator.Stop()
cacher.Stop()
server.Terminate(t)
}
return ctx, NewCacheDelegator(cacher, wrappedStorage), server, terminate
return ctx, delegator, server, terminate
}
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {

View File

@ -207,6 +207,15 @@ func (d *dummyStorage) GetCurrentResourceVersion(ctx context.Context) (uint64, e
return 100, nil
}
type dummyCacher struct {
dummyStorage
ready bool
}
func (d *dummyCacher) Ready() bool {
return d.ready
}
func TestGetListCacheBypass(t *testing.T) {
type opts struct {
ResourceVersion string
@ -326,6 +335,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
result := &example.PodList{}
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
@ -450,6 +460,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
@ -533,6 +544,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
pred := storage.SelectionPredicate{
Limit: 500,
@ -572,6 +584,7 @@ func TestGetCacheBypass(t *testing.T) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
result := &example.Pod{}
@ -608,6 +621,7 @@ func TestWatchCacheBypass(t *testing.T) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
@ -645,6 +659,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
opts := storage.ListOptions{
ResourceVersion: "0",
@ -890,6 +905,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
t.Fatalf("Couldn't create cacher: %v", err)
}
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
@ -2326,6 +2342,7 @@ func BenchmarkCacher_GetList(b *testing.B) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, store)
defer delegator.Stop()
// prepare result and pred
parsedField, err := fields.ParseSelector("spec.nodeName=node-0")
@ -3207,6 +3224,7 @@ func TestRetryAfterForUnreadyCache(t *testing.T) {
}
result := &example.PodList{}
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
err = delegator.GetList(context.TODO(), "/pods/ns", opts, result)
if !apierrors.IsTooManyRequests(err) {

View File

@ -18,12 +18,20 @@ package cacher
import (
"context"
"fmt"
"hash"
"hash/fnv"
"os"
"strconv"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/features"
@ -35,16 +43,45 @@ import (
"k8s.io/klog/v2"
)
var (
// ConsistencyCheckPeriod is the period of checking consistency between etcd and cache.
// 5 minutes were proposed to match the default compaction period. It's magnitute higher than
// List latency SLO (30 seconds) and timeout (1 minute).
ConsistencyCheckPeriod = 5 * time.Minute
// ConsistencyCheckerEnabled enables the consistency checking mechanism for cache.
// Based on KUBE_WATCHCACHE_CONSISTANCY_CHECKER environment variable.
ConsistencyCheckerEnabled = false
)
func init() {
ConsistencyCheckerEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHCACHE_CONSISTANCY_CHECKER"))
}
func NewCacheDelegator(cacher *Cacher, storage storage.Interface) *CacheDelegator {
return &CacheDelegator{
d := &CacheDelegator{
cacher: cacher,
storage: storage,
stopCh: make(chan struct{}),
}
if ConsistencyCheckerEnabled {
d.checker = newConsistencyChecker(cacher.resourcePrefix, cacher.newListFunc, cacher, storage)
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.checker.startChecking(d.stopCh)
}()
}
return d
}
type CacheDelegator struct {
cacher *Cacher
storage storage.Interface
checker *consistencyChecker
wg sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
}
var _ storage.Interface = (*CacheDelegator)(nil)
@ -168,14 +205,18 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
if err != nil {
return err
}
// Setting resource version for consistent read in cache based on current ResourceVersion in etcd.
opts.ResourceVersion = strconv.FormatInt(int64(listRV), 10)
}
err = c.cacher.GetList(ctx, key, opts, listObj, listRV)
err = c.cacher.GetList(ctx, key, opts, listObj)
success := "true"
fallback := "false"
if err != nil {
if consistentRead {
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"
// Reset resourceVersion during fallback from consistent read.
opts.ResourceVersion = ""
err = c.storage.GetList(ctx, key, opts, listObj)
}
if err != nil {
@ -258,3 +299,156 @@ func (c *CacheDelegator) ReadinessCheck() error {
func (c *CacheDelegator) RequestWatchProgress(ctx context.Context) error {
return c.storage.RequestWatchProgress(ctx)
}
func (c *CacheDelegator) Stop() {
c.stopOnce.Do(func() {
close(c.stopCh)
})
c.wg.Wait()
}
func newConsistencyChecker(resourcePrefix string, newListFunc func() runtime.Object, cacher getListerReady, etcd getLister) *consistencyChecker {
return &consistencyChecker{
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
cacher: cacher,
etcd: etcd,
}
}
type consistencyChecker struct {
resourcePrefix string
newListFunc func() runtime.Object
cacher getListerReady
etcd getLister
}
type getListerReady interface {
getLister
Ready() bool
}
type getLister interface {
GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error
}
func (c consistencyChecker) startChecking(stopCh <-chan struct{}) {
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), ConsistencyCheckPeriod, false, func(ctx context.Context) (done bool, err error) {
c.check(ctx)
return false, nil
})
if err != nil {
klog.InfoS("Cache consistency check exiting", "resource", c.resourcePrefix, "err", err)
}
}
func (c *consistencyChecker) check(ctx context.Context) {
digests, err := c.calculateDigests(ctx)
if err != nil {
klog.ErrorS(err, "Cache consistentency check error", "resource", c.resourcePrefix)
metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "error").Inc()
return
}
if digests.CacheDigest == digests.EtcdDigest {
klog.V(3).InfoS("Cache consistentency check passed", "resource", c.resourcePrefix, "resourceVersion", digests.ResourceVersion, "digest", digests.CacheDigest)
metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "success").Inc()
} else {
klog.ErrorS(nil, "Cache consistentency check failed", "resource", c.resourcePrefix, "resourceVersion", digests.ResourceVersion, "etcdDigest", digests.EtcdDigest, "cacheDigest", digests.CacheDigest)
metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "failure").Inc()
}
}
func (c *consistencyChecker) calculateDigests(ctx context.Context) (*storageDigest, error) {
if !c.cacher.Ready() {
return nil, fmt.Errorf("cache is not ready")
}
cacheDigest, resourceVersion, err := c.calculateStoreDigest(ctx, c.cacher, storage.ListOptions{
ResourceVersion: "0",
Predicate: storage.Everything,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
})
if err != nil {
return nil, fmt.Errorf("failed calculating cache digest: %w", err)
}
etcdDigest, _, err := c.calculateStoreDigest(ctx, c.etcd, storage.ListOptions{
ResourceVersion: resourceVersion,
Predicate: storage.Everything,
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
})
if err != nil {
return nil, fmt.Errorf("failed calculating etcd digest: %w", err)
}
return &storageDigest{
ResourceVersion: resourceVersion,
CacheDigest: cacheDigest,
EtcdDigest: etcdDigest,
}, nil
}
type storageDigest struct {
ResourceVersion string
CacheDigest string
EtcdDigest string
}
func (c *consistencyChecker) calculateStoreDigest(ctx context.Context, store getLister, opts storage.ListOptions) (digest, rv string, err error) {
// TODO: Implement pagination
resp := c.newListFunc()
err = store.GetList(ctx, c.resourcePrefix, opts, resp)
if err != nil {
return "", "", err
}
digest, err = listDigest(resp)
if err != nil {
return "", "", err
}
list, err := meta.ListAccessor(resp)
if err != nil {
return "", "", err
}
return digest, list.GetResourceVersion(), nil
}
func listDigest(list runtime.Object) (string, error) {
h := fnv.New64()
err := meta.EachListItem(list, func(obj runtime.Object) error {
objectMeta, err := meta.Accessor(obj)
if err != nil {
return err
}
err = addObjectToDigest(h, objectMeta)
if err != nil {
return err
}
return nil
})
if err != nil {
return "", err
}
return fmt.Sprintf("%x", h.Sum64()), nil
}
func addObjectToDigest(h hash.Hash64, objectMeta metav1.Object) error {
_, err := h.Write([]byte(objectMeta.GetNamespace()))
if err != nil {
return err
}
_, err = h.Write([]byte("/"))
if err != nil {
return err
}
_, err = h.Write([]byte(objectMeta.GetName()))
if err != nil {
return err
}
_, err = h.Write([]byte("/"))
if err != nil {
return err
}
_, err = h.Write([]byte(objectMeta.GetResourceVersion()))
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,194 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"context"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/storage"
)
func TestCalculateDigest(t *testing.T) {
newListFunc := func() runtime.Object { return &example.PodList{} }
testCases := []struct {
desc string
resourceVersion string
cacherReady bool
cacherItems []example.Pod
etcdItems []example.Pod
resourcePrefix string
expectListKey string
expectDigest storageDigest
expectErr bool
}{
{
desc: "not ready",
cacherReady: false,
resourceVersion: "1",
expectErr: true,
},
{
desc: "empty",
resourceVersion: "1",
cacherReady: true,
expectDigest: storageDigest{
ResourceVersion: "1",
CacheDigest: "cbf29ce484222325",
EtcdDigest: "cbf29ce484222325",
},
},
{
desc: "with one element equal",
resourceVersion: "2",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "2"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "2"}},
},
expectDigest: storageDigest{
ResourceVersion: "2",
CacheDigest: "86bf3a5e80d1c5cb",
EtcdDigest: "86bf3a5e80d1c5cb",
},
},
{
desc: "namespace changes digest",
resourceVersion: "2",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "kube-system", Name: "pod", ResourceVersion: "2"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "kube-public", Name: "pod", ResourceVersion: "2"}},
},
expectDigest: storageDigest{
ResourceVersion: "2",
CacheDigest: "4ae4e750bd825b17",
EtcdDigest: "f940a60af965b03",
},
},
{
desc: "name changes digest",
resourceVersion: "2",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod2", ResourceVersion: "2"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod3", ResourceVersion: "2"}},
},
expectDigest: storageDigest{
ResourceVersion: "2",
CacheDigest: "c9120494e4c1897d",
EtcdDigest: "c9156494e4c46274",
},
},
{
desc: "resourceVersion changes digest",
resourceVersion: "2",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "3"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "4"}},
},
expectDigest: storageDigest{
ResourceVersion: "2",
CacheDigest: "86bf3a5e80d1c5ca",
EtcdDigest: "86bf3a5e80d1c5cd",
},
},
{
desc: "watch missed write event",
resourceVersion: "3",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "2"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "2"}},
{ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "3"}},
},
expectDigest: storageDigest{
ResourceVersion: "3",
CacheDigest: "1859bac707c2cb2b",
EtcdDigest: "11d147fc800df0e0",
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
etcd := &dummyStorage{
getListFn: func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
if key != tc.expectListKey {
t.Fatalf("Expect GetList key %q, got %q", tc.expectListKey, key)
}
if opts.ResourceVersion != tc.resourceVersion {
t.Fatalf("Expect GetList resourceVersion %q, got %q", tc.resourceVersion, opts.ResourceVersion)
}
if opts.ResourceVersionMatch != metav1.ResourceVersionMatchExact {
t.Fatalf("Expect GetList match exact, got %q", opts.ResourceVersionMatch)
}
podList := listObj.(*example.PodList)
podList.Items = tc.etcdItems
podList.ResourceVersion = tc.resourceVersion
return nil
},
}
cacher := &dummyCacher{
ready: tc.cacherReady,
dummyStorage: dummyStorage{
getListFn: func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
if key != tc.expectListKey {
t.Fatalf("Expect GetList key %q, got %q", tc.expectListKey, key)
}
if opts.ResourceVersion != "0" {
t.Fatalf("Expect GetList resourceVersion 0, got %q", opts.ResourceVersion)
}
if opts.ResourceVersionMatch != metav1.ResourceVersionMatchNotOlderThan {
t.Fatalf("Expect GetList match not older than, got %q", opts.ResourceVersionMatch)
}
podList := listObj.(*example.PodList)
podList.Items = tc.cacherItems
podList.ResourceVersion = tc.resourceVersion
return nil
},
},
}
checker := newConsistencyChecker(tc.resourcePrefix, newListFunc, cacher, etcd)
digest, err := checker.calculateDigests(context.Background())
if (err != nil) != tc.expectErr {
t.Fatalf("Expect error: %v, got: %v", tc.expectErr, err)
}
if err != nil {
return
}
if *digest != tc.expectDigest {
t.Errorf("Expect: %+v Got: %+v", &tc.expectDigest, *digest)
}
})
}
}

View File

@ -176,6 +176,14 @@ var (
Help: "Counter for consistent reads from cache.",
StabilityLevel: compbasemetrics.ALPHA,
}, []string{"resource", "success", "fallback"})
StorageConsistencyCheckTotal = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Name: "storage_consistency_checks_total",
Help: "Counter for status of consistency checks between etcd and watch cache",
StabilityLevel: compbasemetrics.INTERNAL,
}, []string{"resource", "status"})
)
var registerMetrics sync.Once
@ -198,6 +206,7 @@ func Register() {
legacyregistry.MustRegister(WatchCacheInitializations)
legacyregistry.MustRegister(WatchCacheReadWait)
legacyregistry.MustRegister(ConsistentReadTotal)
legacyregistry.MustRegister(StorageConsistencyCheckTotal)
})
}