From cea854521cfe6bcd923302d0f11272274b89e333 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Thu, 6 Apr 2023 09:54:02 +0200 Subject: [PATCH] cacher: prevent a potential deadlock waitUntilWatchCacheFreshAndForceAllEvents must be called without a read lock held otherwise the watchcache won't be able to make progress (i.e. the watchCache.processEvent method that requries acquiring an exclusive lock) the deadlock can happen only when the alpha watchlist feature flag is on and the client specifically requests streaming. Kubernetes-commit: 476e407ffd2ab393840d3f7a9fd01b71698738a3 --- pkg/storage/cacher/cacher.go | 17 +++++++--- pkg/storage/cacher/cacher_whitebox_test.go | 39 ++++++++++++++++++---- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index 33c404633..c5f27b19c 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -592,6 +592,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions identifier, ) + // note that c.waitUntilWatchCacheFreshAndForceAllEvents must be called without + // the c.watchCache.RLock held otherwise we are at risk of a deadlock + // mainly because c.watchCache.processEvent method won't be able to make progress + // + // moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock + // it is safe to release the lock after the method finishes because we don't require + // any atomicity between the call to the method and further calls that actually get the events. + forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts) + if err != nil { + return newErrWatcher(err), nil + } + // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked // on return from this function. @@ -599,10 +611,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // underlying watchCache is calling processEvent under its lock. c.watchCache.RLock() defer c.watchCache.RUnlock() - forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts) - if err != nil { - return newErrWatcher(err), nil - } + startWatchRV := startWatchResourceVersionFn() var cacheInterval *watchCacheInterval if forceAllEvents { diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 7847c1eaa..0624d5e78 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -1823,6 +1823,7 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) { } func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) if err != nil { @@ -1830,17 +1831,41 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { } defer cacher.Stop() - forceAllEvents, err := cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) - require.NotNil(t, err, "the target method should return non nil error") - require.Equal(t, err.Error(), "Timeout: Too large resource version: 105, current: 100") - require.False(t, forceAllEvents, "the target method after returning an error should NOT instruct the caller to ask for all events in the cache (full state)") + opts := storage.ListOptions{ + Predicate: storage.Everything, + SendInitialEvents: pointer.Bool(true), + ResourceVersion: "105", + } + opts.Predicate.AllowWatchBookmarks = true + + w, err := cacher.Watch(context.Background(), "pods/ns", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + verifyEvents(t, w, []watch.Event{ + { + Type: watch.Error, + Object: &metav1.Status{ + Status: metav1.StatusFailure, + Message: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).Error(), + Details: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).(*apierrors.StatusError).Status().Details, + Reason: metav1.StatusReasonTimeout, + Code: 504, + }, + }, + }, true) go func() { cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"})) }() - forceAllEvents, err = cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) - require.NoError(t, err) - require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)") + w, err = cacher.Watch(context.Background(), "pods/ns", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + verifyEvents(t, w, []watch.Event{ + { + Type: watch.Added, + Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}), + }, + }, true) } type fakeStorage struct {