cacher: prevent a potential deadlock

waitUntilWatchCacheFreshAndForceAllEvents must be called without
a read lock held otherwise the watchcache won't be able to make
progress (i.e. the watchCache.processEvent method that requries acquiring an exclusive lock)

the deadlock can happen only when the alpha watchlist feature flag is on
and the client specifically requests streaming.

Kubernetes-commit: 476e407ffd2ab393840d3f7a9fd01b71698738a3
This commit is contained in:
Lukasz Szaszkiewicz 2023-04-06 09:54:02 +02:00 committed by Kubernetes Publisher
parent 2bd0999ce9
commit cea854521c
2 changed files with 45 additions and 11 deletions

View File

@ -592,6 +592,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
identifier, identifier,
) )
// note that c.waitUntilWatchCacheFreshAndForceAllEvents must be called without
// the c.watchCache.RLock held otherwise we are at risk of a deadlock
// mainly because c.watchCache.processEvent method won't be able to make progress
//
// moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock
// it is safe to release the lock after the method finishes because we don't require
// any atomicity between the call to the method and further calls that actually get the events.
forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
// We explicitly use thread unsafe version and do locking ourself to ensure that // We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked // no new events will be processed in the meantime. The watchCache will be unlocked
// on return from this function. // on return from this function.
@ -599,10 +611,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// underlying watchCache is calling processEvent under its lock. // underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock() c.watchCache.RLock()
defer c.watchCache.RUnlock() defer c.watchCache.RUnlock()
forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
startWatchRV := startWatchResourceVersionFn() startWatchRV := startWatchResourceVersionFn()
var cacheInterval *watchCacheInterval var cacheInterval *watchCacheInterval
if forceAllEvents { if forceAllEvents {

View File

@ -1823,6 +1823,7 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
} }
func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
backingStorage := &dummyStorage{} backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage) cacher, _, err := newTestCacher(backingStorage)
if err != nil { if err != nil {
@ -1830,17 +1831,41 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
} }
defer cacher.Stop() defer cacher.Stop()
forceAllEvents, err := cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) opts := storage.ListOptions{
require.NotNil(t, err, "the target method should return non nil error") Predicate: storage.Everything,
require.Equal(t, err.Error(), "Timeout: Too large resource version: 105, current: 100") SendInitialEvents: pointer.Bool(true),
require.False(t, forceAllEvents, "the target method after returning an error should NOT instruct the caller to ask for all events in the cache (full state)") ResourceVersion: "105",
}
opts.Predicate.AllowWatchBookmarks = true
w, err := cacher.Watch(context.Background(), "pods/ns", opts)
require.NoError(t, err, "failed to create watch: %v")
defer w.Stop()
verifyEvents(t, w, []watch.Event{
{
Type: watch.Error,
Object: &metav1.Status{
Status: metav1.StatusFailure,
Message: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).Error(),
Details: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).(*apierrors.StatusError).Status().Details,
Reason: metav1.StatusReasonTimeout,
Code: 504,
},
},
}, true)
go func() { go func() {
cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"})) cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}))
}() }()
forceAllEvents, err = cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) w, err = cacher.Watch(context.Background(), "pods/ns", opts)
require.NoError(t, err) require.NoError(t, err, "failed to create watch: %v")
require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)") defer w.Stop()
verifyEvents(t, w, []watch.Event{
{
Type: watch.Added,
Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}),
},
}, true)
} }
type fakeStorage struct { type fakeStorage struct {