Merge pull request #121647 from aojea/fixrace

Revert "cacher: when forgeting a watcher, call stopWatcherLocked mult…

Kubernetes-commit: 3570075e4f6d3cdb2aa8273004ea6ec336b33abd
This commit is contained in:
Kubernetes Publisher 2023-10-31 19:16:47 +01:00
commit 347a598452
2 changed files with 9 additions and 50 deletions

View File

@ -112,8 +112,11 @@ func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
wm[number] = w
}
func (wm watchersMap) deleteWatcher(number int) {
delete(wm, number)
func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) {
if watcher, ok := wm[number]; ok {
delete(wm, number)
done(watcher)
}
}
func (wm watchersMap) terminateAll(done func(*cacheWatcher)) {
@ -144,14 +147,14 @@ func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespac
}
}
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool) {
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) {
if supported {
i.valueWatchers[value].deleteWatcher(number)
i.valueWatchers[value].deleteWatcher(number, done)
if len(i.valueWatchers[value]) == 0 {
delete(i.valueWatchers, value)
}
} else {
i.allWatchers[scope].deleteWatcher(number)
i.allWatchers[scope].deleteWatcher(number, done)
if len(i.allWatchers[scope]) == 0 {
delete(i.allWatchers, scope)
}
@ -1220,8 +1223,7 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
// It's possible that the watcher is already not in the structure (e.g. in case of
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
// on a watcher multiple times.
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported)
c.stopWatcherLocked(w)
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked)
}
}

View File

@ -47,7 +47,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
)
@ -1808,45 +1807,3 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
}},
}, true)
}
func TestForgetWatcher(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
require.NoError(t, err)
defer cacher.Stop()
require.Equal(t, 0, len(cacher.watchers.allWatchers))
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
var forgetWatcherFn func(bool)
var forgetCounter int
forgetWatcherWrapped := func(drainWatcher bool) {
forgetCounter++
forgetWatcherFn(drainWatcher)
}
w := newCacheWatcher(
0,
func(_ string, _ labels.Set, _ fields.Set) bool { return true },
nil,
storage.APIObjectVersioner{},
testingclock.NewFakeClock(time.Now()).Now().Add(2*time.Minute),
true,
schema.GroupResource{Resource: "pods"},
"1",
)
forgetWatcherFn = forgetWatcher(cacher, w, 0, namespacedName{}, "", false)
cacher.watchers.addWatcher(w, 0, namespacedName{}, "", false)
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
require.Equal(t, 1, len(cacher.watchers.allWatchers))
forgetWatcherWrapped(false)
require.Equal(t, 0, len(cacher.watchers.allWatchers))
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
require.Equal(t, 1, forgetCounter)
forgetWatcherWrapped(false)
require.Equal(t, 0, len(cacher.watchers.allWatchers))
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
require.Equal(t, 2, forgetCounter)
}