Pop expired watchers in case there is no update

Kubernetes-commit: 0204bc7e2594c297f146701132ee969721ada2ab
This commit is contained in:
Ted Yu 2019-07-30 06:49:01 -07:00 committed by Kubernetes Publisher
parent 781c3cd1b3
commit 06470a960d
2 changed files with 71 additions and 0 deletions

View File

@ -165,6 +165,7 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cache
// if you set fire time at X, you can get the bookmark within (X-1,X+1) period.
// This is NOT thread-safe.
type watcherBookmarkTimeBuckets struct {
lock sync.Mutex
watchersBuckets map[int64][]*cacheWatcher
startBucketID int64
clock clock.Clock
@ -186,6 +187,8 @@ func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
return false
}
bucketID := nextTime.Unix()
t.lock.Lock()
defer t.lock.Unlock()
if bucketID < t.startBucketID {
bucketID = t.startBucketID
}
@ -198,6 +201,8 @@ func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher {
currentBucketID := t.clock.Now().Unix()
// There should be one or two elements in almost all cases
expiredWatchers := make([][]*cacheWatcher, 0, 2)
t.lock.Lock()
defer t.lock.Unlock()
for ; t.startBucketID <= currentBucketID; t.startBucketID++ {
if watchers, ok := t.watchersBuckets[t.startBucketID]; ok {
delete(t.watchersBuckets, t.startBucketID)
@ -784,6 +789,8 @@ func (c *Cacher) dispatchEvents() {
// Never send a bookmark event if we did not see an event here, this is fine
// because we don't provide any guarantees on sending bookmarks.
if lastProcessedResourceVersion == 0 {
// pop expired watchers in case there has been no update
c.bookmarkWatchers.popExpiredWatchers()
continue
}
bookmarkEvent := &watchCacheEvent{

View File

@ -587,6 +587,70 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
}
}
func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 1000)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
pred := storage.Everything
pred.AllowWatchBookmarks = true
// run the collision test for 3 seconds to let ~2 buckets expire
stopCh := make(chan struct{})
time.AfterFunc(3*time.Second, func() { close(stopCh) })
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stopCh:
return
default:
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
w, err := cacher.Watch(ctx, "pods/ns", "0", pred)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
w.Stop()
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stopCh:
return
default:
cacher.bookmarkWatchers.popExpiredWatchers()
}
}
}()
// wait for adding/removing watchers to end
wg.Wait()
// wait out the expiration period and pop expired watchers
time.Sleep(2 * time.Second)
cacher.bookmarkWatchers.popExpiredWatchers()
cacher.bookmarkWatchers.lock.Lock()
defer cacher.bookmarkWatchers.lock.Unlock()
if len(cacher.bookmarkWatchers.watchersBuckets) != 0 {
t.Errorf("unexpected bookmark watchers %v", len(cacher.bookmarkWatchers.watchersBuckets))
}
}
func testCacherSendBookmarkEvents(t *testing.T, watchCacheEnabled, allowWatchBookmarks, expectedBookmarks bool) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, watchCacheEnabled)()
backingStorage := &dummyStorage{}