Enable progress notify events in watchcache

Kubernetes-commit: a94fb5369d5e77b3fcafd1296bac072a1d6e13fe
This commit is contained in:
wojtekt 2020-08-31 13:15:36 +02:00 committed by Kubernetes Publisher
parent e40efde837
commit c4fb001eda
3 changed files with 52 additions and 15 deletions

View File

@ -138,6 +138,12 @@ const (
//
// Allows sending warning headers in API responses.
WarningHeaders featuregate.Feature = "WarningHeaders"
// owner: @wojtek-t
// alpha: v1.20
//
// Allows for updating watchcache resource version with progress notify events.
EfficientWatchResumption featuregate.Feature = "EfficientWatchResumption"
)
func init() {
@ -148,18 +154,19 @@ func init() {
// To add a new feature, define a key for it above and add it here. The features will be
// available throughout Kubernetes binaries.
var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
DryRun: {Default: true, PreRelease: featuregate.GA},
RemainingItemCount: {Default: true, PreRelease: featuregate.Beta},
ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
StorageVersionHash: {Default: true, PreRelease: featuregate.Beta},
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta},
SelectorIndex: {Default: true, PreRelease: featuregate.Beta},
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
DryRun: {Default: true, PreRelease: featuregate.GA},
RemainingItemCount: {Default: true, PreRelease: featuregate.Beta},
ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
StorageVersionHash: {Default: true, PreRelease: featuregate.Beta},
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta},
SelectorIndex: {Default: true, PreRelease: featuregate.Beta},
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
EfficientWatchResumption: {Default: false, PreRelease: featuregate.Alpha},
}

View File

@ -1098,7 +1098,14 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: storage.Everything})
opts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
Predicate: storage.Everything,
}
if utilfeature.DefaultFeatureGate.Enabled(features.EfficientWatchResumption) {
opts.ProgressNotify = true
}
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, opts)
}
// errWatcher implements watch.Interface to return a single error

View File

@ -381,6 +381,29 @@ func (w *watchCache) doCacheResizeLocked(capacity int) {
w.capacity = capacity
}
func (w *watchCache) UpdateResourceVersion(resourceVersion string) {
rv, err := w.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
klog.Errorf("Couldn't parse resourceVersion: %v", err)
return
}
w.Lock()
defer w.Unlock()
w.resourceVersion = rv
// Don't dispatch bookmarks coming from the storage layer.
// They can be very frequent (even to the level of subseconds)
// to allow efficient watch resumption on kube-apiserver restarts,
// and propagating them down may overload the whole system.
//
// TODO: If at some point we decide the performance and scalability
// footprint is acceptable, this is the place to hook them in.
// However, we then need to check if this was called as a result
// of a bookmark event or regular Add/Update/Delete operation by
// checking if resourceVersion here has changed.
}
// List returns list of pointers to <storeElement> objects.
func (w *watchCache) List() []interface{} {
return w.store.List()