Serve watch without resourceVersion from cache and introduce a WatchFromStorageWithoutResourceVersion feature gate to allow serving watch from storage.

Kubernetes-commit: f8f08542c911b0fd620a26ca038b4f255b7a6217
This commit is contained in:
Marek Siarkowicz 2024-03-14 15:20:29 +01:00 committed by Kubernetes Publisher
parent 21e0f5b77d
commit 582da82650
4 changed files with 44 additions and 9 deletions

View File

@ -240,6 +240,12 @@ const (
// Enables support for watch bookmark events. // Enables support for watch bookmark events.
WatchBookmark featuregate.Feature = "WatchBookmark" WatchBookmark featuregate.Feature = "WatchBookmark"
// owner: @serathius
// beta: 1.30
// Enables watches without resourceVersion to be served from storage.
// Used to prevent https://github.com/kubernetes/kubernetes/issues/123072 until etcd fixes the issue.
WatchFromStorageWithoutResourceVersion featuregate.Feature = "WatchFromStorageWithoutResourceVersion"
// owner: @vinaykul // owner: @vinaykul
// kep: http://kep.k8s.io/1287 // kep: http://kep.k8s.io/1287
// alpha: v1.27 // alpha: v1.27
@ -328,6 +334,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},
InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha}, InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},
WatchList: {Default: false, PreRelease: featuregate.Alpha}, WatchList: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -522,7 +522,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil { if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
opts.SendInitialEvents = nil opts.SendInitialEvents = nil
} }
if opts.SendInitialEvents == nil && opts.ResourceVersion == "" { if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts) return c.storage.Watch(ctx, key, opts)
} }
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)

View File

@ -341,9 +341,18 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
} }
func TestWatchSemantics(t *testing.T) { func TestWatchSemantics(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t) t.Run("WatchFromStorageWithoutResourceVersion=true", func(t *testing.T) {
t.Cleanup(terminate) defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)()
storagetesting.RunWatchSemantics(context.TODO(), t, store) store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
})
t.Run("WatchFromStorageWithoutResourceVersion=false", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)()
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
})
} }
func TestWatchSemanticInitialEventsExtended(t *testing.T) { func TestWatchSemanticInitialEventsExtended(t *testing.T) {

View File

@ -324,8 +324,6 @@ func TestWatchCacheBypass(t *testing.T) {
t.Fatalf("unexpected error waiting for the cache to be ready") t.Fatalf("unexpected error waiting for the cache to be ready")
} }
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "0", ResourceVersion: "0",
Predicate: storage.Everything, Predicate: storage.Everything,
@ -334,12 +332,32 @@ func TestWatchCacheBypass(t *testing.T) {
t.Errorf("Watch with RV=0 should be served from cache: %v", err) t.Errorf("Watch with RV=0 should be served from cache: %v", err)
} }
// With unset RV, check if cacher is bypassed.
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "", ResourceVersion: "",
Predicate: storage.Everything,
}) })
if err != errDummy { if err != nil {
t.Errorf("Watch with unset RV should bypass cacher: %v", err) t.Errorf("Watch with RV=0 should be served from cache: %v", err)
}
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)()
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
if err != nil {
t.Errorf("With WatchFromStorageWithoutResourceVersion disabled, watch with unset RV should be served from cache: %v", err)
}
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)()
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
if !errors.Is(err, errDummy) {
t.Errorf("With WatchFromStorageWithoutResourceVersion enabled, watch with unset RV should be served from storage: %v", err)
} }
} }