Merge pull request #95145 from wojtek-t/send_bookmarks_on_changes

Watch bookmarks may contain version of objects of other types

Kubernetes-commit: b9d2df810c89bde69cd790905ec937caf1c36d19
This commit is contained in:
Kubernetes Publisher 2020-09-29 10:11:25 -07:00
commit fd42eacc6e
3 changed files with 97 additions and 21 deletions

View File

@ -793,7 +793,19 @@ func (c *Cacher) dispatchEvents() {
if !ok {
return
}
c.dispatchEvent(&event)
// Don't dispatch bookmarks coming from the storage layer.
// They can be very frequent (even to the level of subseconds)
// to allow efficient watch resumption on kube-apiserver restarts,
// and propagating them down may overload the whole system.
//
// TODO: If at some point we decide the performance and scalability
// footprint is acceptable, this is the place to hook them in.
// However, we then need to check if this was called as a result
// of a bookmark event or regular Add/Update/Delete operation by
// checking if resourceVersion here has changed.
if event.Type != watch.Bookmark {
c.dispatchEvent(&event)
}
lastProcessedResourceVersion = event.ResourceVersion
case <-bookmarkTimer.C():
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))

View File

@ -41,10 +41,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
var (
@ -637,7 +634,6 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
}
func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
@ -865,7 +861,6 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
}
func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
@ -938,6 +933,71 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
}
}
func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Ensure that bookmarks are sent more frequently than every 1m.
cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)
// Wait until cacher is initialized.
cacher.ready.wait()
makePod := func(i int) *examplev1.Pod {
return &examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", i),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", i),
},
}
}
if err := cacher.watchCache.Add(makePod(1000)); err != nil {
t.Errorf("error: %v", err)
}
pred := storage.Everything
pred.AllowWatchBookmarks = true
w, err := cacher.Watch(context.TODO(), "/pods/ns", storage.ListOptions{
ResourceVersion: "1000",
Predicate: pred,
})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
expectedRV := 2000
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
event, ok := <-w.ResultChan()
if !ok {
t.Fatalf("Unexpected closed channel")
}
rv, err := cacher.versioner.ObjectResourceVersion(event.Object)
if err != nil {
t.Errorf("failed to parse resource version from %#v: %v", event.Object, err)
}
if event.Type == watch.Bookmark && rv == uint64(expectedRV) {
return
}
}
}()
// Simulate progress notify event.
cacher.watchCache.UpdateResourceVersion(strconv.Itoa(expectedRV))
wg.Wait()
}
func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)

View File

@ -320,8 +320,9 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
}
// Avoid calling event handler under lock.
// This is safe as long as there is at most one call to processEvent in flight
// at any point in time.
// This is safe as long as there is at most one call to Add/Update/Delete and
// UpdateResourceVersion in flight at any point in time, which is true now,
// because reflector calls them synchronously from its main thread.
if w.eventHandler != nil {
w.eventHandler(wcEvent)
}
@ -388,20 +389,23 @@ func (w *watchCache) UpdateResourceVersion(resourceVersion string) {
return
}
w.Lock()
defer w.Unlock()
w.resourceVersion = rv
func() {
w.Lock()
defer w.Unlock()
w.resourceVersion = rv
}()
// Don't dispatch bookmarks coming from the storage layer.
// They can be very frequent (even to the level of subseconds)
// to allow efficient watch resumption on kube-apiserver restarts,
// and propagating them down may overload the whole system.
//
// TODO: If at some point we decide the performance and scalability
// footprint is acceptable, this is the place to hook them in.
// However, we then need to check if this was called as a result
// of a bookmark event or regular Add/Update/Delete operation by
// checking if resourceVersion here has changed.
// Avoid calling event handler under lock.
// This is safe as long as there is at most one call to Add/Update/Delete and
// UpdateResourceVersion in flight at any point in time, which is true now,
// because reflector calls them synchronously from its main thread.
if w.eventHandler != nil {
wcEvent := &watchCacheEvent{
Type: watch.Bookmark,
ResourceVersion: rv,
}
w.eventHandler(wcEvent)
}
}
// List returns list of pointers to <storeElement> objects.