diff --git a/go.mod b/go.mod index 0bc0dd591..cc1e5a721 100644 --- a/go.mod +++ b/go.mod @@ -48,8 +48,8 @@ require ( gopkg.in/evanphx/json-patch.v4 v4.12.0 gopkg.in/go-jose/go-jose.v2 v2.6.3 gopkg.in/natefinch/lumberjack.v2 v2.2.1 - k8s.io/api v0.0.0-20250319053034-feb95d943ada - k8s.io/apimachinery v0.0.0-20250319052758-7e8c77e774c9 + k8s.io/api v0.0.0-20250319173043-fc83166ea9db + k8s.io/apimachinery v0.0.0-20250319092800-e8a77bd768fd k8s.io/client-go v0.0.0-20250319053412-169f1af1bf07 k8s.io/component-base v0.0.0-20250319054524-7c899b094d78 k8s.io/klog/v2 v2.130.1 diff --git a/go.sum b/go.sum index 62f81bd82..2caf65691 100644 --- a/go.sum +++ b/go.sum @@ -367,10 +367,10 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -k8s.io/api v0.0.0-20250319053034-feb95d943ada h1:jkgp/vD+5CoL2n17AMKQ3g3ELsKmn+zBDXqwvpPvmXw= -k8s.io/api v0.0.0-20250319053034-feb95d943ada/go.mod h1:MsIjX9SIqRiiwfw1r0s0lMHaMw6jhSX8h4VjblK393I= -k8s.io/apimachinery v0.0.0-20250319052758-7e8c77e774c9 h1:vw/UFDFjwXc5W6nMCOUmIaFX19fkQ720CygFuZOS9jM= -k8s.io/apimachinery v0.0.0-20250319052758-7e8c77e774c9/go.mod h1:D2UW665TVSpInyOuG6C+PMtC1MZheP0KQz65UPQEiI4= +k8s.io/api v0.0.0-20250319173043-fc83166ea9db h1:x+vYK/B3vjbtoBztfSz0rXHiGAMmVgEeqO1HtBGkxuo= +k8s.io/api v0.0.0-20250319173043-fc83166ea9db/go.mod h1:JO0tyTI0qSXXaGVhLdqwfi3RMbS2g9hcYvzBmZP5wVk= +k8s.io/apimachinery v0.0.0-20250319092800-e8a77bd768fd h1:KoXgjwEokLM8o95kMxowg5vp5iQ4v46Kk+zobsqeTgU= +k8s.io/apimachinery v0.0.0-20250319092800-e8a77bd768fd/go.mod h1:D2UW665TVSpInyOuG6C+PMtC1MZheP0KQz65UPQEiI4= k8s.io/client-go v0.0.0-20250319053412-169f1af1bf07 h1:UmlJkL72Xyrfs30rqXWtVUcjV15AeOggxctLIiKuNsE= k8s.io/client-go v0.0.0-20250319053412-169f1af1bf07/go.mod h1:a4HxhGqHxxHlQQTrtis+Srk1+UsPuKeUlZtQAYq34bU= k8s.io/component-base v0.0.0-20250319054524-7c899b094d78 h1:XT4M6c2LNlUur1y+q3jptBfkxxcQHz8gS9pRQiYZAWQ= diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index ec05c9962..430f4c01b 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -463,31 +463,19 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } func (c *Cacher) startCaching(stopChannel <-chan struct{}) { - // The 'usable' lock is always 'RLock'able when it is safe to use the cache. - // It is safe to use the cache after a successful list until a disconnection. - // We start with usable (write) locked. The below OnReplace function will - // unlock it after a successful list. The below defer will then re-lock - // it when this function exits (always due to disconnection), only if - // we actually got a successful list. This cycle will repeat as needed. - successfulList := false c.watchCache.SetOnReplace(func() { - successfulList = true - c.ready.set(true) + c.ready.setReady() klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String()) metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc() }) + var err error defer func() { - if successfulList { - c.ready.set(false) - } + c.ready.setError(err) }() c.terminateAllWatchers() - // Note that since onReplace may be not called due to errors, we explicitly - // need to retry it on errors under lock. - // Also note that startCaching is called in a loop, so there's no need - // to have another loop here. - if err := c.reflector.ListAndWatch(stopChannel); err != nil { + err = c.reflector.ListAndWatch(stopChannel) + if err != nil { klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err) } } @@ -506,11 +494,11 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions var readyGeneration int if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { - var ok bool + var err error var downtime time.Duration - readyGeneration, downtime, ok = c.ready.checkAndReadGeneration() - if !ok { - return nil, errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime)) + readyGeneration, downtime, err = c.ready.checkAndReadGeneration() + if err != nil { + return nil, errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime)) } } else { readyGeneration, err = c.ready.waitAndReadGeneration(ctx) @@ -631,7 +619,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions c.Lock() defer c.Unlock() - if generation, _, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok { + if generation, _, err := c.ready.checkAndReadGeneration(); generation != readyGeneration || err != nil { // We went unready or are already on a different generation. // Avoid registering and starting the watch as it will have to be // terminated immediately anyway. @@ -749,10 +737,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio defer span.End(500 * time.Millisecond) if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { - if downtime, ok := c.ready.check(); !ok { + if downtime, err := c.ready.check(); err != nil { // If Cacher is not initialized, reject List requests // as described in https://kep.k8s.io/4568 - return errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime)) + return errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime)) } } else { if err := c.ready.wait(ctx); err != nil { @@ -1304,8 +1292,8 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach } func (c *Cacher) Ready() bool { - _, ok := c.ready.check() - return ok + _, err := c.ready.check() + return err == nil } // errWatcher implements watch.Interface to return a single error diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 66f399cec..1d58dce8b 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -3213,7 +3213,7 @@ func TestRetryAfterForUnreadyCache(t *testing.T) { t.Fatalf("Unexpected error waiting for the cache to be ready") } - cacher.ready.set(false) + cacher.ready.setError(nil) clock.Step(14 * time.Second) opts := storage.ListOptions{ diff --git a/pkg/storage/cacher/ready.go b/pkg/storage/cacher/ready.go index 0ba05738d..68ff509f0 100644 --- a/pkg/storage/cacher/ready.go +++ b/pkg/storage/cacher/ready.go @@ -41,7 +41,8 @@ const ( // | ^ // └---------------------------┘ type ready struct { - state status // represent the state of the variable + state status // represent the state of the variable + lastErr error generation int // represent the number of times we have transtioned to ready lock sync.RWMutex // protect the state and generation variables restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated @@ -87,8 +88,7 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) { } r.lock.RLock() - switch r.state { - case Pending: + if r.state == Pending { // since we allow to switch between the states Pending and Ready // if there is a quick transition from Pending -> Ready -> Pending // a process that was waiting can get unblocked and see a Pending @@ -96,40 +96,61 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) { // avoid an inconsistent state on the system, with some processes not // waiting despite the state moved back to Pending. r.lock.RUnlock() - case Ready: - generation := r.generation - r.lock.RUnlock() - return generation, nil - case Stopped: - r.lock.RUnlock() - return 0, fmt.Errorf("apiserver cacher is stopped") - default: - r.lock.RUnlock() - return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state) + continue } + generation, err := r.readGenerationLocked() + r.lock.RUnlock() + return generation, err } } // check returns the time elapsed since the state was last changed and the current value. -func (r *ready) check() (time.Duration, bool) { - _, elapsed, ok := r.checkAndReadGeneration() - return elapsed, ok +func (r *ready) check() (time.Duration, error) { + _, elapsed, err := r.checkAndReadGeneration() + return elapsed, err } // checkAndReadGeneration returns the current generation, the time elapsed since the state was last changed and the current value. -func (r *ready) checkAndReadGeneration() (int, time.Duration, bool) { +func (r *ready) checkAndReadGeneration() (int, time.Duration, error) { r.lock.RLock() defer r.lock.RUnlock() - return r.generation, r.clock.Since(r.lastStateChangeTime), r.state == Ready + generation, err := r.readGenerationLocked() + return generation, r.clock.Since(r.lastStateChangeTime), err +} + +func (r *ready) readGenerationLocked() (int, error) { + switch r.state { + case Pending: + if r.lastErr == nil { + return 0, fmt.Errorf("storage is (re)initializing") + } else { + return 0, fmt.Errorf("storage is (re)initializing: %w", r.lastErr) + } + case Ready: + return r.generation, nil + case Stopped: + return 0, fmt.Errorf("apiserver cacher is stopped") + default: + return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state) + } +} + +func (r *ready) setReady() { + r.set(true, nil) +} + +func (r *ready) setError(err error) { + r.set(false, err) } // set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped. -func (r *ready) set(ok bool) { +func (r *ready) set(ok bool, err error) { r.lock.Lock() defer r.lock.Unlock() if r.state == Stopped { return } + r.lastErr = err if ok && r.state == Pending { r.state = Ready r.generation++ diff --git a/pkg/storage/cacher/ready_test.go b/pkg/storage/cacher/ready_test.go index 3246327cd..53537fc82 100644 --- a/pkg/storage/cacher/ready_test.go +++ b/pkg/storage/cacher/ready_test.go @@ -18,6 +18,7 @@ package cacher import ( "context" + "errors" "sync" "testing" "time" @@ -28,7 +29,7 @@ import ( func Test_newReady(t *testing.T) { errCh := make(chan error, 10) ready := newReady(testingclock.NewFakeClock(time.Now())) - ready.set(false) + ready.setError(nil) // create 10 goroutines waiting for ready for i := 0; i < 10; i++ { go func() { @@ -40,7 +41,7 @@ func Test_newReady(t *testing.T) { case <-errCh: t.Errorf("ready should be blocking") } - ready.set(true) + ready.setReady() for i := 0; i < 10; i++ { if err := <-errCh; err != nil { t.Errorf("unexpected error on channel %d", i) @@ -51,22 +52,22 @@ func Test_newReady(t *testing.T) { func Test_newReadySetIdempotent(t *testing.T) { errCh := make(chan error, 10) ready := newReady(testingclock.NewFakeClock(time.Now())) - ready.set(false) - ready.set(false) - ready.set(false) - if generation, _, ok := ready.checkAndReadGeneration(); generation != 0 || ok { - t.Errorf("unexpected state: generation=%v ready=%v", generation, ok) + ready.setError(nil) + ready.setError(nil) + ready.setError(nil) + if generation, _, err := ready.checkAndReadGeneration(); generation != 0 || err == nil { + t.Errorf("unexpected state: generation=%v ready=%v", generation, err) } - ready.set(true) - if generation, _, ok := ready.checkAndReadGeneration(); generation != 1 || !ok { - t.Errorf("unexpected state: generation=%v ready=%v", generation, ok) + ready.setReady() + if generation, _, err := ready.checkAndReadGeneration(); generation != 1 || err != nil { + t.Errorf("unexpected state: generation=%v ready=%v", generation, err) } - ready.set(true) - ready.set(true) - if generation, _, ok := ready.checkAndReadGeneration(); generation != 1 || !ok { - t.Errorf("unexpected state: generation=%v ready=%v", generation, ok) + ready.setReady() + ready.setReady() + if generation, _, err := ready.checkAndReadGeneration(); generation != 1 || err != nil { + t.Errorf("unexpected state: generation=%v ready=%v", generation, err) } - ready.set(false) + ready.setError(nil) // create 10 goroutines waiting for ready and stop for i := 0; i < 10; i++ { go func() { @@ -78,9 +79,9 @@ func Test_newReadySetIdempotent(t *testing.T) { case <-errCh: t.Errorf("ready should be blocking") } - ready.set(true) - if generation, _, ok := ready.checkAndReadGeneration(); generation != 2 || !ok { - t.Errorf("unexpected state: generation=%v ready=%v", generation, ok) + ready.setReady() + if generation, _, err := ready.checkAndReadGeneration(); generation != 2 || err != nil { + t.Errorf("unexpected state: generation=%v ready=%v", generation, err) } for i := 0; i < 10; i++ { if err := <-errCh; err != nil { @@ -95,7 +96,7 @@ func Test_newReadyRacy(t *testing.T) { concurrency := 1000 errCh := make(chan error, concurrency) ready := newReady(testingclock.NewFakeClock(time.Now())) - ready.set(false) + ready.setError(nil) wg := sync.WaitGroup{} wg.Add(2 * concurrency) @@ -105,16 +106,16 @@ func Test_newReadyRacy(t *testing.T) { }() go func() { defer wg.Done() - ready.set(false) + ready.setError(nil) }() go func() { defer wg.Done() - ready.set(true) + ready.setReady() }() } // Last one has to be set to true. wg.Wait() - ready.set(true) + ready.setReady() for i := 0; i < concurrency; i++ { if err := <-errCh; err != nil { @@ -126,7 +127,7 @@ func Test_newReadyRacy(t *testing.T) { func Test_newReadyStop(t *testing.T) { errCh := make(chan error, 10) ready := newReady(testingclock.NewFakeClock(time.Now())) - ready.set(false) + ready.setError(nil) // create 10 goroutines waiting for ready and stop for i := 0; i < 10; i++ { go func() { @@ -149,22 +150,22 @@ func Test_newReadyStop(t *testing.T) { func Test_newReadyCheck(t *testing.T) { ready := newReady(testingclock.NewFakeClock(time.Now())) // it starts as false - if _, ok := ready.check(); ok { - t.Errorf("unexpected ready state %v", ok) + if _, err := ready.check(); err == nil { + t.Errorf("unexpected ready state %v", err) } - ready.set(true) - if _, ok := ready.check(); !ok { - t.Errorf("unexpected ready state %v", ok) + ready.setReady() + if _, err := ready.check(); err != nil { + t.Errorf("unexpected ready state %v", err) } // stop sets ready to false ready.stop() - if _, ok := ready.check(); ok { - t.Errorf("unexpected ready state %v", ok) + if _, err := ready.check(); err == nil { + t.Errorf("unexpected ready state %v", err) } // can not set to true if is stopped - ready.set(true) - if _, ok := ready.check(); ok { - t.Errorf("unexpected ready state %v", ok) + ready.setReady() + if _, err := ready.check(); err == nil { + t.Errorf("unexpected ready state %v", err) } err := ready.wait(context.Background()) if err == nil { @@ -175,7 +176,7 @@ func Test_newReadyCheck(t *testing.T) { func Test_newReadyCancelPending(t *testing.T) { errCh := make(chan error, 10) ready := newReady(testingclock.NewFakeClock(time.Now())) - ready.set(false) + ready.setError(nil) ctx, cancel := context.WithCancel(context.Background()) // create 10 goroutines stuck on pending for i := 0; i < 10; i++ { @@ -204,19 +205,19 @@ func Test_newReadyStateChangeTimestamp(t *testing.T) { fakeClock.Step(time.Minute) checkReadyTransitionTime(t, ready, time.Minute) - ready.set(true) + ready.setReady() fakeClock.Step(time.Minute) checkReadyTransitionTime(t, ready, time.Minute) fakeClock.Step(time.Minute) checkReadyTransitionTime(t, ready, 2*time.Minute) - ready.set(false) + ready.setError(nil) fakeClock.Step(time.Minute) checkReadyTransitionTime(t, ready, time.Minute) fakeClock.Step(time.Minute) checkReadyTransitionTime(t, ready, 2*time.Minute) - ready.set(true) + ready.setReady() fakeClock.Step(time.Minute) checkReadyTransitionTime(t, ready, time.Minute) @@ -232,3 +233,21 @@ func checkReadyTransitionTime(t *testing.T, r *ready, expectedLastStateChangeDur t.Errorf("unexpected last state change duration: %v, expected: %v", lastStateChangeDuration, expectedLastStateChangeDuration) } } + +func TestReadyError(t *testing.T) { + ready := newReady(testingclock.NewFakeClock(time.Now())) + _, _, err := ready.checkAndReadGeneration() + if err == nil || err.Error() != "storage is (re)initializing" { + t.Errorf("Unexpected error when unready, got %q", err) + } + ready.setError(errors.New("etcd is down")) + _, _, err = ready.checkAndReadGeneration() + if err == nil || err.Error() != "storage is (re)initializing: etcd is down" { + t.Errorf("Unexpected error when unready, got %q", err) + } + ready.setError(nil) + _, _, err = ready.checkAndReadGeneration() + if err == nil || err.Error() != "storage is (re)initializing" { + t.Errorf("Unexpected error when unready, got %q", err) + } +}