From e4baab2feb21604c048e11cc30821baa55e735b6 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Thu, 23 Feb 2023 23:13:01 +0000 Subject: [PATCH] cacher allow context cancellation if not ready Replace the sync.Cond variable with a channel so we can use the context cancellation signal. Co-authored-by: Wojciech Tyczy<84>ski Change-Id: I2f75313a6337feee440ece4c1e873c32a12560dd Kubernetes-commit: d4559ad44833022b9946b179b55a9dfbbd3db78c --- pkg/storage/cacher/cacher.go | 8 +- pkg/storage/cacher/cacher_whitebox_test.go | 61 ++++++++++---- pkg/storage/cacher/ready.go | 87 ++++++++++++++----- pkg/storage/cacher/ready_test.go | 97 +++++++++++++++++++++- 4 files changed, 207 insertions(+), 46 deletions(-) diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index a5fe72317..ac3f0196b 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -462,7 +462,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return nil, err } - if err := c.ready.wait(); err != nil { + if err := c.ready.wait(ctx); err != nil { return nil, errors.NewServiceUnavailable(err.Error()) } @@ -561,7 +561,7 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o // Do not create a trace - it's not for free and there are tons // of Get requests. We can add it if it will be really needed. - if err := c.ready.wait(); err != nil { + if err := c.ready.wait(ctx); err != nil { return errors.NewServiceUnavailable(err.Error()) } @@ -649,7 +649,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio trace := utiltrace.New("cacher list", utiltrace.Field{Key: "type", Value: c.objectType.String()}) defer trace.LogIfLong(500 * time.Millisecond) - if err := c.ready.wait(); err != nil { + if err := c.ready.wait(ctx); err != nil { return errors.NewServiceUnavailable(err.Error()) } trace.Step("Ready") @@ -1048,7 +1048,7 @@ func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWit // LastSyncResourceVersion returns resource version to which the underlying cache is synced. func (c *Cacher) LastSyncResourceVersion() (uint64, error) { - if err := c.ready.wait(); err != nil { + if err := c.ready.wait(context.Background()); err != nil { return 0, errors.NewServiceUnavailable(err.Error()) } diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 5f0ba257a..885703294 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -29,6 +29,7 @@ import ( v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -337,7 +338,7 @@ func TestGetListCacheBypass(t *testing.T) { result := &example.PodList{} // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -376,7 +377,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { result := &example.PodList{} // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -410,7 +411,7 @@ func TestGetCacheBypass(t *testing.T) { result := &example.Pod{} // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -433,6 +434,34 @@ func TestGetCacheBypass(t *testing.T) { } } +func TestWatchNotHangingOnStartupFailure(t *testing.T) { + // Configure cacher so that it can't initialize, because of + // constantly failing lists to the underlying storage. + dummyErr := fmt.Errorf("dummy") + backingStorage := &dummyStorage{err: dummyErr} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + // Cancel the watch after some time to check if it will properly + // terminate instead of hanging forever. + go func() { + defer cancel() + cacher.clock.Sleep(5 * time.Second) + }() + + // Watch hangs waiting on watchcache being initialized. + // Ensure that it terminates when its context is cancelled + // (e.g. the request is terminated for whatever reason). + _, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0"}) + if err == nil || err.Error() != apierrors.NewServiceUnavailable(context.Canceled.Error()).Error() { + t.Errorf("Unexpected error: %#v", err) + } +} + func TestWatcherNotGoingBackInTime(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) @@ -442,7 +471,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -569,7 +598,7 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -609,7 +638,7 @@ func TestCacheDontAcceptRequestsStopped(t *testing.T) { } // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -714,7 +743,7 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } pred := storage.Everything @@ -812,7 +841,7 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo defer cacher.Stop() // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } pred := storage.Everything @@ -912,7 +941,7 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) { cacher.bookmarkWatchers.bookmarkFrequency = time.Second // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } pred := storage.Everything @@ -982,7 +1011,7 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -1060,7 +1089,7 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) { cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second) // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -1138,7 +1167,7 @@ func TestStartingResourceVersion(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -1218,7 +1247,7 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -1329,7 +1358,7 @@ func TestCachingDeleteEvents(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -1411,7 +1440,7 @@ func testCachingObjects(t *testing.T, watchersCount int) { defer cacher.Stop() // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -1507,7 +1536,7 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { + if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } // Ensure there is enough budget for slow processing since diff --git a/pkg/storage/cacher/ready.go b/pkg/storage/cacher/ready.go index 8278dd2b2..f3821a881 100644 --- a/pkg/storage/cacher/ready.go +++ b/pkg/storage/cacher/ready.go @@ -17,6 +17,7 @@ limitations under the License. package cacher import ( + "context" "fmt" "sync" ) @@ -30,27 +31,53 @@ const ( ) // ready is a three state condition variable that blocks until is Ready if is not Stopped. -// Its initial state is Pending. +// Its initial state is Pending and its state machine diagram is as follow. +// +// Pending <------> Ready -----> Stopped +// +// | ^ +// └---------------------------┘ type ready struct { - state status - c *sync.Cond + state status // represent the state of the variable + lock sync.RWMutex // protect the state variable + restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated + waitCh chan struct{} // blocks until is ready or stopped } func newReady() *ready { return &ready{ - c: sync.NewCond(&sync.RWMutex{}), - state: Pending, + waitCh: make(chan struct{}), + state: Pending, } } +// done close the channel once the state is Ready or Stopped +func (r *ready) done() chan struct{} { + r.restartLock.Lock() + defer r.restartLock.Unlock() + return r.waitCh +} + // wait blocks until it is Ready or Stopped, it returns an error if is Stopped. -func (r *ready) wait() error { - r.c.L.Lock() - defer r.c.L.Unlock() - for r.state == Pending { - r.c.Wait() +func (r *ready) wait(ctx context.Context) error { + // r.done() only blocks if state is Pending + select { + case <-ctx.Done(): + return ctx.Err() + case <-r.done(): } + + r.lock.RLock() + defer r.lock.RUnlock() switch r.state { + case Pending: + // since we allow to switch between the states Pending and Ready + // if there is a quick transition from Pending -> Ready -> Pending + // a process that was waiting can get unblocked and see a Pending state. + // If the state is Pending don't return an error because it can only happen + // here after the r.done() channel is closed because the state moved from + // Pending to Ready. + return nil case Ready: return nil case Stopped: @@ -62,35 +89,49 @@ func (r *ready) wait() error { // check returns true only if it is Ready. func (r *ready) check() bool { - // TODO: Make check() function more sophisticated, in particular - // allow it to behave as "waitWithTimeout". - rwMutex := r.c.L.(*sync.RWMutex) - rwMutex.RLock() - defer rwMutex.RUnlock() + r.lock.RLock() + defer r.lock.RUnlock() return r.state == Ready } // set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped. func (r *ready) set(ok bool) { - r.c.L.Lock() - defer r.c.L.Unlock() + r.lock.Lock() + defer r.lock.Unlock() if r.state == Stopped { return } - if ok { + if ok && r.state == Pending { r.state = Ready - } else { + select { + case <-r.waitCh: + default: + close(r.waitCh) + } + } else if !ok && r.state == Ready { + // creating the waitCh can be racy if + // something enter the wait() method + select { + case <-r.waitCh: + r.restartLock.Lock() + r.waitCh = make(chan struct{}) + r.restartLock.Unlock() + default: + } r.state = Pending } - r.c.Broadcast() } // stop the condition variable and set it as Stopped. This state is irreversible. func (r *ready) stop() { - r.c.L.Lock() - defer r.c.L.Unlock() + r.lock.Lock() + defer r.lock.Unlock() if r.state != Stopped { r.state = Stopped - r.c.Broadcast() + } + select { + case <-r.waitCh: + default: + close(r.waitCh) } } diff --git a/pkg/storage/cacher/ready_test.go b/pkg/storage/cacher/ready_test.go index c14a7068a..18556630d 100644 --- a/pkg/storage/cacher/ready_test.go +++ b/pkg/storage/cacher/ready_test.go @@ -17,7 +17,9 @@ limitations under the License. package cacher import ( + "context" "testing" + "time" ) func Test_newReady(t *testing.T) { @@ -27,9 +29,14 @@ func Test_newReady(t *testing.T) { // create 10 goroutines waiting for ready for i := 0; i < 10; i++ { go func() { - errCh <- ready.wait() + errCh <- ready.wait(context.Background()) }() } + select { + case <-time.After(1 * time.Second): + case <-errCh: + t.Errorf("ready should be blocking") + } ready.set(true) for i := 0; i < 10; i++ { if err := <-errCh; err != nil { @@ -38,6 +45,61 @@ func Test_newReady(t *testing.T) { } } +func Test_newReadySetIdempotent(t *testing.T) { + errCh := make(chan error, 10) + ready := newReady() + ready.set(false) + ready.set(false) + ready.set(false) + ready.set(true) + ready.set(true) + ready.set(true) + ready.set(false) + // create 10 goroutines waiting for ready and stop + for i := 0; i < 10; i++ { + go func() { + errCh <- ready.wait(context.Background()) + }() + } + select { + case <-time.After(1 * time.Second): + case <-errCh: + t.Errorf("ready should be blocking") + } + ready.set(true) + for i := 0; i < 10; i++ { + if err := <-errCh; err != nil { + t.Errorf("unexpected error on channel %d", i) + } + } +} + +// Test_newReadyRacy executes all the possible transitions randomly. +// It must run with the race detector enabled. +func Test_newReadyRacy(t *testing.T) { + concurrency := 1000 + errCh := make(chan error, concurrency) + ready := newReady() + ready.set(false) + for i := 0; i < concurrency; i++ { + go func() { + errCh <- ready.wait(context.Background()) + }() + go func() { + ready.set(false) + }() + go func() { + ready.set(true) + }() + } + ready.set(true) + for i := 0; i < concurrency; i++ { + if err := <-errCh; err != nil { + t.Errorf("unexpected error %v on channel %d", err, i) + } + } +} + func Test_newReadyStop(t *testing.T) { errCh := make(chan error, 10) ready := newReady() @@ -45,9 +107,14 @@ func Test_newReadyStop(t *testing.T) { // create 10 goroutines waiting for ready and stop for i := 0; i < 10; i++ { go func() { - errCh <- ready.wait() + errCh <- ready.wait(context.Background()) }() } + select { + case <-time.After(1 * time.Second): + case <-errCh: + t.Errorf("ready should be blocking") + } ready.stop() for i := 0; i < 10; i++ { if err := <-errCh; err == nil { @@ -76,8 +143,32 @@ func Test_newReadyCheck(t *testing.T) { if ready.check() { t.Errorf("unexpected ready state %v", ready.check()) } - err := ready.wait() + err := ready.wait(context.Background()) if err == nil { t.Errorf("expected error waiting on a stopped state") } } + +func Test_newReadyCancelPending(t *testing.T) { + errCh := make(chan error, 10) + ready := newReady() + ready.set(false) + ctx, cancel := context.WithCancel(context.Background()) + // create 10 goroutines stuck on pending + for i := 0; i < 10; i++ { + go func() { + errCh <- ready.wait(ctx) + }() + } + select { + case <-time.After(1 * time.Second): + case <-errCh: + t.Errorf("ready should be blocking") + } + cancel() + for i := 0; i < 10; i++ { + if err := <-errCh; err == nil { + t.Errorf("unexpected success on channel %d", i) + } + } +}