diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index bb944b64f..e524e0c64 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -240,6 +240,12 @@ const ( // Enables support for watch bookmark events. WatchBookmark featuregate.Feature = "WatchBookmark" + // owner: @serathius + // beta: 1.30 + // Enables watches without resourceVersion to be served from storage. + // Used to prevent https://github.com/kubernetes/kubernetes/issues/123072 until etcd fixes the issue. + WatchFromStorageWithoutResourceVersion featuregate.Feature = "WatchFromStorageWithoutResourceVersion" + // owner: @vinaykul // kep: http://kep.k8s.io/1287 // alpha: v1.27 @@ -328,6 +334,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, + WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta}, + InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha}, WatchList: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index 581cb3c02..900f300cd 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -522,7 +522,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil { opts.SendInitialEvents = nil } - if opts.SendInitialEvents == nil && opts.ResourceVersion == "" { + if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" { return c.storage.Watch(ctx, key, opts) } requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) diff --git a/pkg/storage/cacher/cacher_test.go b/pkg/storage/cacher/cacher_test.go index 3cb4eff84..566835935 100644 --- a/pkg/storage/cacher/cacher_test.go +++ b/pkg/storage/cacher/cacher_test.go @@ -341,9 +341,18 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) { } func TestWatchSemantics(t *testing.T) { - store, terminate := testSetupWithEtcdAndCreateWrapper(t) - t.Cleanup(terminate) - storagetesting.RunWatchSemantics(context.TODO(), t, store) + t.Run("WatchFromStorageWithoutResourceVersion=true", func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)() + store, terminate := testSetupWithEtcdAndCreateWrapper(t) + t.Cleanup(terminate) + storagetesting.RunWatchSemantics(context.TODO(), t, store) + }) + t.Run("WatchFromStorageWithoutResourceVersion=false", func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)() + store, terminate := testSetupWithEtcdAndCreateWrapper(t) + t.Cleanup(terminate) + storagetesting.RunWatchSemantics(context.TODO(), t, store) + }) } func TestWatchSemanticInitialEventsExtended(t *testing.T) { diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index 4a0115b78..61c5709d1 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -324,8 +324,6 @@ func TestWatchCacheBypass(t *testing.T) { t.Fatalf("unexpected error waiting for the cache to be ready") } - // Inject error to underlying layer and check if cacher is not bypassed. - backingStorage.injectError(errDummy) _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ ResourceVersion: "0", Predicate: storage.Everything, @@ -334,12 +332,32 @@ func TestWatchCacheBypass(t *testing.T) { t.Errorf("Watch with RV=0 should be served from cache: %v", err) } - // With unset RV, check if cacher is bypassed. _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ ResourceVersion: "", + Predicate: storage.Everything, }) - if err != errDummy { - t.Errorf("Watch with unset RV should bypass cacher: %v", err) + if err != nil { + t.Errorf("Watch with RV=0 should be served from cache: %v", err) + } + + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)() + _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ + ResourceVersion: "", + Predicate: storage.Everything, + }) + if err != nil { + t.Errorf("With WatchFromStorageWithoutResourceVersion disabled, watch with unset RV should be served from cache: %v", err) + } + + // Inject error to underlying layer and check if cacher is not bypassed. + backingStorage.injectError(errDummy) + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)() + _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ + ResourceVersion: "", + Predicate: storage.Everything, + }) + if !errors.Is(err, errDummy) { + t.Errorf("With WatchFromStorageWithoutResourceVersion enabled, watch with unset RV should be served from storage: %v", err) } }