From 73057f16b6953c9ece3744dda3a3e6af8ded8b67 Mon Sep 17 00:00:00 2001 From: "fansong.cfs" Date: Wed, 3 Apr 2019 09:37:48 +0800 Subject: [PATCH] add timeout suuport for watch Kubernetes-commit: b304a1f96d85cd351a0e22a091e17064d5edb35e --- pkg/endpoints/handlers/get.go | 3 +- .../generic/registry/decorated_watcher.go | 8 +++- pkg/storage/cacher/cacher.go | 20 ++++++--- pkg/storage/cacher/cacher_whitebox_test.go | 43 ++++++++++++++++++- 4 files changed, 64 insertions(+), 10 deletions(-) diff --git a/pkg/endpoints/handlers/get.go b/pkg/endpoints/handlers/get.go index d6757730b..af7ae1d54 100644 --- a/pkg/endpoints/handlers/get.go +++ b/pkg/endpoints/handlers/get.go @@ -248,7 +248,8 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0)) } klog.V(3).Infof("Starting watch for %s, rv=%s labels=%s fields=%s timeout=%s", req.URL.Path, opts.ResourceVersion, opts.LabelSelector, opts.FieldSelector, timeout) - + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() watcher, err := rw.Watch(ctx, &opts) if err != nil { scope.err(err, w, req) diff --git a/pkg/registry/generic/registry/decorated_watcher.go b/pkg/registry/generic/registry/decorated_watcher.go index f589dd1ec..d6fba9700 100644 --- a/pkg/registry/generic/registry/decorated_watcher.go +++ b/pkg/registry/generic/registry/decorated_watcher.go @@ -45,9 +45,15 @@ func newDecoratedWatcher(w watch.Interface, decorator ObjectFunc) *decoratedWatc func (d *decoratedWatcher) run(ctx context.Context) { var recv, send watch.Event + var ok bool for { select { - case recv = <-d.w.ResultChan(): + case recv, ok = <-d.w.ResultChan(): + // The underlying channel may be closed after timeout. + if !ok { + d.cancel() + return + } switch recv.Type { case watch.Added, watch.Modified, watch.Deleted: err := d.decorator(recv.Object) diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index 77795f3a9..71507d988 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -413,7 +413,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, c.watcherIdx++ }() - go watcher.process(initEvents, watchRV) + go watcher.process(ctx, initEvents, watchRV) return watcher, nil } @@ -1063,7 +1063,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { } } -func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) { +func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) { defer utilruntime.HandleCrash() // Check how long we are processing initEvents. @@ -1099,10 +1099,18 @@ func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion ui defer close(c.result) defer c.Stop() - for event := range c.input { - // only send events newer than resourceVersion - if event.ResourceVersion > resourceVersion { - c.sendWatchCacheEvent(event) + for { + select { + case event, ok := <-c.input: + if !ok { + return + } + // only send events newer than resourceVersion + if event.ResourceVersion > resourceVersion { + c.sendWatchCacheEvent(event) + } + case <-ctx.Done(): + return } } } diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index b5ca9ae90..07dfcd5c5 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -64,7 +64,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. w = newCacheWatcher(0, filter, forget, testVersioner{}) - go w.process(initEvents, 0) + go w.process(context.Background(), initEvents, 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { lock.RLock() @@ -182,8 +182,9 @@ TestCase: for j := range testCase.events { testCase.events[j].ResourceVersion = uint64(j) + 1 } + w := newCacheWatcher(0, filter, forget, testVersioner{}) - go w.process(testCase.events, 0) + go w.process(context.Background(), testCase.events, 0) ch := w.ResultChan() for j, event := range testCase.expected { e := <-ch @@ -445,3 +446,41 @@ func TestWatcherNotGoingBackInTime(t *testing.T) { } } } + +func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { + var w *cacheWatcher + done := make(chan struct{}) + filter := func(string, labels.Set, fields.Set) bool { return true } + forget := func() { + w.stop() + done <- struct{}{} + } + + maxRetriesToProduceTheRaceCondition := 1000 + // Simulating the timer is fired and stopped concurrently by set time + // timeout to zero and run the Stop goroutine concurrently. + // May sure that the watch will not be blocked on Stop. + for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { + w = newCacheWatcher(0, filter, forget, testVersioner{}) + go w.Stop() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("stop is blocked when the timer is fired concurrently") + } + } + + // After that, verifies the cacheWatcher.process goroutine works correctly. + for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { + w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}) + w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} + ctx, _ := context.WithTimeout(context.Background(), time.Hour) + go w.process(ctx, nil, 0) + select { + case <-w.ResultChan(): + case <-time.After(time.Second): + t.Fatal("expected received a event on ResultChan") + } + w.Stop() + } +}