Merge pull request #130899 from serathius/watchcache-error

Implement watchcache returning error from etcd that caused cache reinitialization

Kubernetes-commit: e5558a81c93fef5463b02ae7c2a8c0c4b15ecc3a
This commit is contained in:
Kubernetes Publisher 2025-03-19 10:19:13 -07:00
commit a581683e3c
6 changed files with 117 additions and 89 deletions

4
go.mod
View File

@ -48,8 +48,8 @@ require (
gopkg.in/evanphx/json-patch.v4 v4.12.0
gopkg.in/go-jose/go-jose.v2 v2.6.3
gopkg.in/natefinch/lumberjack.v2 v2.2.1
k8s.io/api v0.0.0-20250319053034-feb95d943ada
k8s.io/apimachinery v0.0.0-20250319052758-7e8c77e774c9
k8s.io/api v0.0.0-20250319173043-fc83166ea9db
k8s.io/apimachinery v0.0.0-20250319092800-e8a77bd768fd
k8s.io/client-go v0.0.0-20250319053412-169f1af1bf07
k8s.io/component-base v0.0.0-20250319054524-7c899b094d78
k8s.io/klog/v2 v2.130.1

8
go.sum
View File

@ -367,10 +367,10 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20250319053034-feb95d943ada h1:jkgp/vD+5CoL2n17AMKQ3g3ELsKmn+zBDXqwvpPvmXw=
k8s.io/api v0.0.0-20250319053034-feb95d943ada/go.mod h1:MsIjX9SIqRiiwfw1r0s0lMHaMw6jhSX8h4VjblK393I=
k8s.io/apimachinery v0.0.0-20250319052758-7e8c77e774c9 h1:vw/UFDFjwXc5W6nMCOUmIaFX19fkQ720CygFuZOS9jM=
k8s.io/apimachinery v0.0.0-20250319052758-7e8c77e774c9/go.mod h1:D2UW665TVSpInyOuG6C+PMtC1MZheP0KQz65UPQEiI4=
k8s.io/api v0.0.0-20250319173043-fc83166ea9db h1:x+vYK/B3vjbtoBztfSz0rXHiGAMmVgEeqO1HtBGkxuo=
k8s.io/api v0.0.0-20250319173043-fc83166ea9db/go.mod h1:JO0tyTI0qSXXaGVhLdqwfi3RMbS2g9hcYvzBmZP5wVk=
k8s.io/apimachinery v0.0.0-20250319092800-e8a77bd768fd h1:KoXgjwEokLM8o95kMxowg5vp5iQ4v46Kk+zobsqeTgU=
k8s.io/apimachinery v0.0.0-20250319092800-e8a77bd768fd/go.mod h1:D2UW665TVSpInyOuG6C+PMtC1MZheP0KQz65UPQEiI4=
k8s.io/client-go v0.0.0-20250319053412-169f1af1bf07 h1:UmlJkL72Xyrfs30rqXWtVUcjV15AeOggxctLIiKuNsE=
k8s.io/client-go v0.0.0-20250319053412-169f1af1bf07/go.mod h1:a4HxhGqHxxHlQQTrtis+Srk1+UsPuKeUlZtQAYq34bU=
k8s.io/component-base v0.0.0-20250319054524-7c899b094d78 h1:XT4M6c2LNlUur1y+q3jptBfkxxcQHz8gS9pRQiYZAWQ=

View File

@ -463,31 +463,19 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
}
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
// It is safe to use the cache after a successful list until a disconnection.
// We start with usable (write) locked. The below OnReplace function will
// unlock it after a successful list. The below defer will then re-lock
// it when this function exits (always due to disconnection), only if
// we actually got a successful list. This cycle will repeat as needed.
successfulList := false
c.watchCache.SetOnReplace(func() {
successfulList = true
c.ready.set(true)
c.ready.setReady()
klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String())
metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc()
})
var err error
defer func() {
if successfulList {
c.ready.set(false)
}
c.ready.setError(err)
}()
c.terminateAllWatchers()
// Note that since onReplace may be not called due to errors, we explicitly
// need to retry it on errors under lock.
// Also note that startCaching is called in a loop, so there's no need
// to have another loop here.
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
err = c.reflector.ListAndWatch(stopChannel)
if err != nil {
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err)
}
}
@ -506,11 +494,11 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
var readyGeneration int
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
var ok bool
var err error
var downtime time.Duration
readyGeneration, downtime, ok = c.ready.checkAndReadGeneration()
if !ok {
return nil, errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime))
readyGeneration, downtime, err = c.ready.checkAndReadGeneration()
if err != nil {
return nil, errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime))
}
} else {
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
@ -631,7 +619,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.Lock()
defer c.Unlock()
if generation, _, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
if generation, _, err := c.ready.checkAndReadGeneration(); generation != readyGeneration || err != nil {
// We went unready or are already on a different generation.
// Avoid registering and starting the watch as it will have to be
// terminated immediately anyway.
@ -749,10 +737,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
defer span.End(500 * time.Millisecond)
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if downtime, ok := c.ready.check(); !ok {
if downtime, err := c.ready.check(); err != nil {
// If Cacher is not initialized, reject List requests
// as described in https://kep.k8s.io/4568
return errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime))
return errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime))
}
} else {
if err := c.ready.wait(ctx); err != nil {
@ -1304,8 +1292,8 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach
}
func (c *Cacher) Ready() bool {
_, ok := c.ready.check()
return ok
_, err := c.ready.check()
return err == nil
}
// errWatcher implements watch.Interface to return a single error

View File

@ -3213,7 +3213,7 @@ func TestRetryAfterForUnreadyCache(t *testing.T) {
t.Fatalf("Unexpected error waiting for the cache to be ready")
}
cacher.ready.set(false)
cacher.ready.setError(nil)
clock.Step(14 * time.Second)
opts := storage.ListOptions{

View File

@ -41,7 +41,8 @@ const (
// | ^
// └---------------------------┘
type ready struct {
state status // represent the state of the variable
state status // represent the state of the variable
lastErr error
generation int // represent the number of times we have transtioned to ready
lock sync.RWMutex // protect the state and generation variables
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
@ -87,8 +88,7 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
}
r.lock.RLock()
switch r.state {
case Pending:
if r.state == Pending {
// since we allow to switch between the states Pending and Ready
// if there is a quick transition from Pending -> Ready -> Pending
// a process that was waiting can get unblocked and see a Pending
@ -96,40 +96,61 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
// avoid an inconsistent state on the system, with some processes not
// waiting despite the state moved back to Pending.
r.lock.RUnlock()
case Ready:
generation := r.generation
r.lock.RUnlock()
return generation, nil
case Stopped:
r.lock.RUnlock()
return 0, fmt.Errorf("apiserver cacher is stopped")
default:
r.lock.RUnlock()
return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state)
continue
}
generation, err := r.readGenerationLocked()
r.lock.RUnlock()
return generation, err
}
}
// check returns the time elapsed since the state was last changed and the current value.
func (r *ready) check() (time.Duration, bool) {
_, elapsed, ok := r.checkAndReadGeneration()
return elapsed, ok
func (r *ready) check() (time.Duration, error) {
_, elapsed, err := r.checkAndReadGeneration()
return elapsed, err
}
// checkAndReadGeneration returns the current generation, the time elapsed since the state was last changed and the current value.
func (r *ready) checkAndReadGeneration() (int, time.Duration, bool) {
func (r *ready) checkAndReadGeneration() (int, time.Duration, error) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.generation, r.clock.Since(r.lastStateChangeTime), r.state == Ready
generation, err := r.readGenerationLocked()
return generation, r.clock.Since(r.lastStateChangeTime), err
}
func (r *ready) readGenerationLocked() (int, error) {
switch r.state {
case Pending:
if r.lastErr == nil {
return 0, fmt.Errorf("storage is (re)initializing")
} else {
return 0, fmt.Errorf("storage is (re)initializing: %w", r.lastErr)
}
case Ready:
return r.generation, nil
case Stopped:
return 0, fmt.Errorf("apiserver cacher is stopped")
default:
return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state)
}
}
func (r *ready) setReady() {
r.set(true, nil)
}
func (r *ready) setError(err error) {
r.set(false, err)
}
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
func (r *ready) set(ok bool) {
func (r *ready) set(ok bool, err error) {
r.lock.Lock()
defer r.lock.Unlock()
if r.state == Stopped {
return
}
r.lastErr = err
if ok && r.state == Pending {
r.state = Ready
r.generation++

View File

@ -18,6 +18,7 @@ package cacher
import (
"context"
"errors"
"sync"
"testing"
"time"
@ -28,7 +29,7 @@ import (
func Test_newReady(t *testing.T) {
errCh := make(chan error, 10)
ready := newReady(testingclock.NewFakeClock(time.Now()))
ready.set(false)
ready.setError(nil)
// create 10 goroutines waiting for ready
for i := 0; i < 10; i++ {
go func() {
@ -40,7 +41,7 @@ func Test_newReady(t *testing.T) {
case <-errCh:
t.Errorf("ready should be blocking")
}
ready.set(true)
ready.setReady()
for i := 0; i < 10; i++ {
if err := <-errCh; err != nil {
t.Errorf("unexpected error on channel %d", i)
@ -51,22 +52,22 @@ func Test_newReady(t *testing.T) {
func Test_newReadySetIdempotent(t *testing.T) {
errCh := make(chan error, 10)
ready := newReady(testingclock.NewFakeClock(time.Now()))
ready.set(false)
ready.set(false)
ready.set(false)
if generation, _, ok := ready.checkAndReadGeneration(); generation != 0 || ok {
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
ready.setError(nil)
ready.setError(nil)
ready.setError(nil)
if generation, _, err := ready.checkAndReadGeneration(); generation != 0 || err == nil {
t.Errorf("unexpected state: generation=%v ready=%v", generation, err)
}
ready.set(true)
if generation, _, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
ready.setReady()
if generation, _, err := ready.checkAndReadGeneration(); generation != 1 || err != nil {
t.Errorf("unexpected state: generation=%v ready=%v", generation, err)
}
ready.set(true)
ready.set(true)
if generation, _, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
ready.setReady()
ready.setReady()
if generation, _, err := ready.checkAndReadGeneration(); generation != 1 || err != nil {
t.Errorf("unexpected state: generation=%v ready=%v", generation, err)
}
ready.set(false)
ready.setError(nil)
// create 10 goroutines waiting for ready and stop
for i := 0; i < 10; i++ {
go func() {
@ -78,9 +79,9 @@ func Test_newReadySetIdempotent(t *testing.T) {
case <-errCh:
t.Errorf("ready should be blocking")
}
ready.set(true)
if generation, _, ok := ready.checkAndReadGeneration(); generation != 2 || !ok {
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
ready.setReady()
if generation, _, err := ready.checkAndReadGeneration(); generation != 2 || err != nil {
t.Errorf("unexpected state: generation=%v ready=%v", generation, err)
}
for i := 0; i < 10; i++ {
if err := <-errCh; err != nil {
@ -95,7 +96,7 @@ func Test_newReadyRacy(t *testing.T) {
concurrency := 1000
errCh := make(chan error, concurrency)
ready := newReady(testingclock.NewFakeClock(time.Now()))
ready.set(false)
ready.setError(nil)
wg := sync.WaitGroup{}
wg.Add(2 * concurrency)
@ -105,16 +106,16 @@ func Test_newReadyRacy(t *testing.T) {
}()
go func() {
defer wg.Done()
ready.set(false)
ready.setError(nil)
}()
go func() {
defer wg.Done()
ready.set(true)
ready.setReady()
}()
}
// Last one has to be set to true.
wg.Wait()
ready.set(true)
ready.setReady()
for i := 0; i < concurrency; i++ {
if err := <-errCh; err != nil {
@ -126,7 +127,7 @@ func Test_newReadyRacy(t *testing.T) {
func Test_newReadyStop(t *testing.T) {
errCh := make(chan error, 10)
ready := newReady(testingclock.NewFakeClock(time.Now()))
ready.set(false)
ready.setError(nil)
// create 10 goroutines waiting for ready and stop
for i := 0; i < 10; i++ {
go func() {
@ -149,22 +150,22 @@ func Test_newReadyStop(t *testing.T) {
func Test_newReadyCheck(t *testing.T) {
ready := newReady(testingclock.NewFakeClock(time.Now()))
// it starts as false
if _, ok := ready.check(); ok {
t.Errorf("unexpected ready state %v", ok)
if _, err := ready.check(); err == nil {
t.Errorf("unexpected ready state %v", err)
}
ready.set(true)
if _, ok := ready.check(); !ok {
t.Errorf("unexpected ready state %v", ok)
ready.setReady()
if _, err := ready.check(); err != nil {
t.Errorf("unexpected ready state %v", err)
}
// stop sets ready to false
ready.stop()
if _, ok := ready.check(); ok {
t.Errorf("unexpected ready state %v", ok)
if _, err := ready.check(); err == nil {
t.Errorf("unexpected ready state %v", err)
}
// can not set to true if is stopped
ready.set(true)
if _, ok := ready.check(); ok {
t.Errorf("unexpected ready state %v", ok)
ready.setReady()
if _, err := ready.check(); err == nil {
t.Errorf("unexpected ready state %v", err)
}
err := ready.wait(context.Background())
if err == nil {
@ -175,7 +176,7 @@ func Test_newReadyCheck(t *testing.T) {
func Test_newReadyCancelPending(t *testing.T) {
errCh := make(chan error, 10)
ready := newReady(testingclock.NewFakeClock(time.Now()))
ready.set(false)
ready.setError(nil)
ctx, cancel := context.WithCancel(context.Background())
// create 10 goroutines stuck on pending
for i := 0; i < 10; i++ {
@ -204,19 +205,19 @@ func Test_newReadyStateChangeTimestamp(t *testing.T) {
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, time.Minute)
ready.set(true)
ready.setReady()
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, time.Minute)
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, 2*time.Minute)
ready.set(false)
ready.setError(nil)
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, time.Minute)
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, 2*time.Minute)
ready.set(true)
ready.setReady()
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, time.Minute)
@ -232,3 +233,21 @@ func checkReadyTransitionTime(t *testing.T, r *ready, expectedLastStateChangeDur
t.Errorf("unexpected last state change duration: %v, expected: %v", lastStateChangeDuration, expectedLastStateChangeDuration)
}
}
func TestReadyError(t *testing.T) {
ready := newReady(testingclock.NewFakeClock(time.Now()))
_, _, err := ready.checkAndReadGeneration()
if err == nil || err.Error() != "storage is (re)initializing" {
t.Errorf("Unexpected error when unready, got %q", err)
}
ready.setError(errors.New("etcd is down"))
_, _, err = ready.checkAndReadGeneration()
if err == nil || err.Error() != "storage is (re)initializing: etcd is down" {
t.Errorf("Unexpected error when unready, got %q", err)
}
ready.setError(nil)
_, _, err = ready.checkAndReadGeneration()
if err == nil || err.Error() != "storage is (re)initializing" {
t.Errorf("Unexpected error when unready, got %q", err)
}
}