diff --git a/go.mod b/go.mod index c7e5d8593..a881563e0 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( google.golang.org/protobuf v1.28.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/square/go-jose.v2 v2.6.0 - k8s.io/api v0.0.0-20230412035552-2bdacdf27ceb + k8s.io/api v0.0.0-20230412115724-b0b8e3f95d08 k8s.io/apimachinery v0.0.0-20230412035310-31e4b9d241f7 k8s.io/client-go v0.0.0-20230412040258-8005e0d28ba2 k8s.io/component-base v0.0.0-20230327183930-4fb97becef37 @@ -124,7 +124,7 @@ require ( ) replace ( - k8s.io/api => k8s.io/api v0.0.0-20230412035552-2bdacdf27ceb + k8s.io/api => k8s.io/api v0.0.0-20230412115724-b0b8e3f95d08 k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230412035310-31e4b9d241f7 k8s.io/client-go => k8s.io/client-go v0.0.0-20230412040258-8005e0d28ba2 k8s.io/component-base => k8s.io/component-base v0.0.0-20230327183930-4fb97becef37 diff --git a/go.sum b/go.sum index 19123ac96..e450141cb 100644 --- a/go.sum +++ b/go.sum @@ -878,8 +878,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -k8s.io/api v0.0.0-20230412035552-2bdacdf27ceb h1:cfveC5neDeBviOYWd4lir9rh9A0es/5mJvZ5FWjyhOk= -k8s.io/api v0.0.0-20230412035552-2bdacdf27ceb/go.mod h1:vtqOH+6A0ypk7hxqunFCvaEIub+VUspnbLv5dNIfIuE= +k8s.io/api v0.0.0-20230412115724-b0b8e3f95d08 h1:x1R20/eaphFlQSm4SJC2evVwufVUt5yeOmQJgNjyn+Y= +k8s.io/api v0.0.0-20230412115724-b0b8e3f95d08/go.mod h1:cQ6M2OwzuocrtP+XOKc0I/oqo1E+CNAVuEVkvwXqG3I= k8s.io/apimachinery v0.0.0-20230412035310-31e4b9d241f7 h1:a4P0At3Mtt9Rqu98XWQ1nG/hlYDhYQiCxYWDEXTz/8c= k8s.io/apimachinery v0.0.0-20230412035310-31e4b9d241f7/go.mod h1:5ikh59fK3AJ287GUvpUsryoMFtH9zj/ARfWCo3AyXTM= k8s.io/client-go v0.0.0-20230412040258-8005e0d28ba2 h1:Mxfg52BtCwS9aQ0uqJSBtpQrDvFdbTZve06ttHHOqRk= diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index 33c404633..c5f27b19c 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -592,6 +592,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions identifier, ) + // note that c.waitUntilWatchCacheFreshAndForceAllEvents must be called without + // the c.watchCache.RLock held otherwise we are at risk of a deadlock + // mainly because c.watchCache.processEvent method won't be able to make progress + // + // moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock + // it is safe to release the lock after the method finishes because we don't require + // any atomicity between the call to the method and further calls that actually get the events. + forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts) + if err != nil { + return newErrWatcher(err), nil + } + // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked // on return from this function. @@ -599,10 +611,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // underlying watchCache is calling processEvent under its lock. c.watchCache.RLock() defer c.watchCache.RUnlock() - forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts) - if err != nil { - return newErrWatcher(err), nil - } + startWatchRV := startWatchResourceVersionFn() var cacheInterval *watchCacheInterval if forceAllEvents { diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 7847c1eaa..0624d5e78 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -1823,6 +1823,7 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) { } func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) if err != nil { @@ -1830,17 +1831,41 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { } defer cacher.Stop() - forceAllEvents, err := cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) - require.NotNil(t, err, "the target method should return non nil error") - require.Equal(t, err.Error(), "Timeout: Too large resource version: 105, current: 100") - require.False(t, forceAllEvents, "the target method after returning an error should NOT instruct the caller to ask for all events in the cache (full state)") + opts := storage.ListOptions{ + Predicate: storage.Everything, + SendInitialEvents: pointer.Bool(true), + ResourceVersion: "105", + } + opts.Predicate.AllowWatchBookmarks = true + + w, err := cacher.Watch(context.Background(), "pods/ns", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + verifyEvents(t, w, []watch.Event{ + { + Type: watch.Error, + Object: &metav1.Status{ + Status: metav1.StatusFailure, + Message: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).Error(), + Details: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).(*apierrors.StatusError).Status().Details, + Reason: metav1.StatusReasonTimeout, + Code: 504, + }, + }, + }, true) go func() { cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"})) }() - forceAllEvents, err = cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) - require.NoError(t, err) - require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)") + w, err = cacher.Watch(context.Background(), "pods/ns", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + verifyEvents(t, w, []watch.Event{ + { + Type: watch.Added, + Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}), + }, + }, true) } type fakeStorage struct {