Merge pull request #116172 from wojtek-t/fix_watch_cache

Fix missed watch events when watch is initialized simultanously with reinitializing watchcache

Kubernetes-commit: 856d6d9caaae7793795b87c7ffdef1a6f7f7c113
This commit is contained in:
Kubernetes Publisher 2023-03-16 07:31:21 -07:00
commit 0fc1d27cde
4 changed files with 193 additions and 10 deletions

View File

@ -524,7 +524,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return nil, err
}
if err := c.ready.wait(ctx); err != nil {
readyGeneration, err := c.ready.waitAndReadGeneration(ctx)
if err != nil {
return nil, errors.NewServiceUnavailable(err.Error())
}
@ -616,14 +617,24 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return newErrWatcher(err), nil
}
addedWatcher := false
func() {
c.Lock()
defer c.Unlock()
if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
// We went unready or are already on a different generation.
// Avoid registering and starting the watch as it will have to be
// terminated immediately anyway.
return
}
// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
// Update the bookMarkAfterResourceVersion
watcher.setBookmarkAfterResourceVersion(bookmarkAfterResourceVersionFn())
c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
addedWatcher = true
// Add it to the queue only when the client support watch bookmarks.
if watcher.allowWatchBookmarks {
@ -632,6 +643,14 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.watcherIdx++
}()
if !addedWatcher {
// Watcher isn't really started at this point, so it's safe to just drop it.
//
// We're simulating the immediate watch termination, which boils down to simply
// closing the watcher.
return newImmediateCloseWatcher(), nil
}
go watcher.processInterval(ctx, cacheInterval, startWatchRV)
return watcher, nil
}
@ -1377,3 +1396,24 @@ func (c *errWatcher) ResultChan() <-chan watch.Event {
func (c *errWatcher) Stop() {
// no-op
}
// immediateCloseWatcher implements watch.Interface that is immediately closed
type immediateCloseWatcher struct {
result chan watch.Event
}
func newImmediateCloseWatcher() *immediateCloseWatcher {
watcher := &immediateCloseWatcher{result: make(chan watch.Event)}
close(watcher.result)
return watcher
}
// Implements watch.Interface.
func (c *immediateCloseWatcher) ResultChan() <-chan watch.Event {
return c.result
}
// Implements watch.Interface.
func (c *immediateCloseWatcher) Stop() {
// no-op
}

View File

@ -128,6 +128,7 @@ type dummyStorage struct {
sync.RWMutex
err error
getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error
watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error)
}
type dummyWatch struct {
@ -155,7 +156,10 @@ func (d *dummyStorage) Create(_ context.Context, _ string, _, _ runtime.Object,
func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions, _ storage.ValidateObjectFunc, _ runtime.Object) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
func (d *dummyStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
if d.watchFn != nil {
return d.watchFn(ctx, key, opts)
}
d.RLock()
defer d.RUnlock()
@ -447,7 +451,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
}
}
func TestCacheDontAcceptRequestsStopped(t *testing.T) {
func TestCacherDontAcceptRequestsStopped(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
@ -509,6 +513,117 @@ func TestCacheDontAcceptRequestsStopped(t *testing.T) {
}
}
func TestCacherDontMissEventsOnReinitialization(t *testing.T) {
makePod := func(i int) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", i),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", i),
},
}
}
listCalls, watchCalls := 0, 0
backingStorage := &dummyStorage{
getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
podList := listObj.(*example.PodList)
var err error
switch listCalls {
case 0:
podList.ListMeta = metav1.ListMeta{ResourceVersion: "1"}
case 1:
podList.ListMeta = metav1.ListMeta{ResourceVersion: "10"}
default:
err = fmt.Errorf("unexpected list call")
}
listCalls++
return err
},
watchFn: func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
var w *watch.FakeWatcher
var err error
switch watchCalls {
case 0:
w = watch.NewFakeWithChanSize(10, false)
for i := 2; i < 8; i++ {
w.Add(makePod(i))
}
// Emit an error to force relisting.
w.Error(nil)
w.Stop()
case 1:
w = watch.NewFakeWithChanSize(10, false)
for i := 12; i < 18; i++ {
w.Add(makePod(i))
}
w.Stop()
default:
err = fmt.Errorf("unexpected watch call")
}
watchCalls++
return w, err
},
}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
concurrency := 1000
wg := sync.WaitGroup{}
wg.Add(concurrency)
// Ensure that test doesn't deadlock if cacher already processed everything
// and get back into Pending state before some watches get called.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
errCh := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
w, err := cacher.Watch(ctx, "pods", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
if err != nil {
// Watch failed to initialize (this most probably means that cacher
// already moved back to Pending state before watch initialized.
// Ignore this case.
return
}
defer w.Stop()
prevRV := -1
for event := range w.ResultChan() {
if event.Type == watch.Error {
break
}
object := event.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
rv, err := strconv.Atoi(object.(*example.Pod).ResourceVersion)
if err != nil {
errCh <- fmt.Errorf("incorrect resource version: %v", err)
return
}
if prevRV != -1 && prevRV+1 != rv {
errCh <- fmt.Errorf("unexpected event received, prevRV=%d, rv=%d", prevRV, rv)
return
}
prevRV = rv
}
}()
}
wg.Wait()
close(errCh)
for err := range errCh {
t.Error(err)
}
}
func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)

View File

@ -39,7 +39,8 @@ const (
// └---------------------------┘
type ready struct {
state status // represent the state of the variable
lock sync.RWMutex // protect the state variable
generation int // represent the number of times we have transtioned to ready
lock sync.RWMutex // protect the state and generation variables
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
waitCh chan struct{} // blocks until is ready or stopped
}
@ -60,11 +61,18 @@ func (r *ready) done() chan struct{} {
// wait blocks until it is Ready or Stopped, it returns an error if is Stopped.
func (r *ready) wait(ctx context.Context) error {
_, err := r.waitAndReadGeneration(ctx)
return err
}
// waitAndReadGenration blocks until it is Ready or Stopped and returns number
// of times we entered ready state if Ready and error otherwise.
func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
for {
// r.done() only blocks if state is Pending
select {
case <-ctx.Done():
return ctx.Err()
return 0, ctx.Err()
case <-r.done():
}
@ -79,23 +87,30 @@ func (r *ready) wait(ctx context.Context) error {
// waiting despite the state moved back to Pending.
r.lock.RUnlock()
case Ready:
generation := r.generation
r.lock.RUnlock()
return nil
return generation, nil
case Stopped:
r.lock.RUnlock()
return fmt.Errorf("apiserver cacher is stopped")
return 0, fmt.Errorf("apiserver cacher is stopped")
default:
r.lock.RUnlock()
return fmt.Errorf("unexpected apiserver cache state: %v", r.state)
return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state)
}
}
}
// check returns true only if it is Ready.
func (r *ready) check() bool {
_, ok := r.checkAndReadGeneration()
return ok
}
// checkAndReadGeneration returns the current generation and whether it is Ready.
func (r *ready) checkAndReadGeneration() (int, bool) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.state == Ready
return r.generation, r.state == Ready
}
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
@ -107,6 +122,7 @@ func (r *ready) set(ok bool) {
}
if ok && r.state == Pending {
r.state = Ready
r.generation++
select {
case <-r.waitCh:
default:

View File

@ -52,9 +52,18 @@ func Test_newReadySetIdempotent(t *testing.T) {
ready.set(false)
ready.set(false)
ready.set(false)
if generation, ok := ready.checkAndReadGeneration(); generation != 0 || ok {
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
}
ready.set(true)
if generation, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
}
ready.set(true)
ready.set(true)
ready.set(true)
if generation, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
}
ready.set(false)
// create 10 goroutines waiting for ready and stop
for i := 0; i < 10; i++ {
@ -68,6 +77,9 @@ func Test_newReadySetIdempotent(t *testing.T) {
t.Errorf("ready should be blocking")
}
ready.set(true)
if generation, ok := ready.checkAndReadGeneration(); generation != 2 || !ok {
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
}
for i := 0; i < 10; i++ {
if err := <-errCh; err != nil {
t.Errorf("unexpected error on channel %d", i)