diff --git a/pkg/registry/generic/registry/decorated_watcher.go b/pkg/registry/generic/registry/decorated_watcher.go index e341b371d..e066d2a7f 100644 --- a/pkg/registry/generic/registry/decorated_watcher.go +++ b/pkg/registry/generic/registry/decorated_watcher.go @@ -29,8 +29,8 @@ type decoratedWatcher struct { resultCh chan watch.Event } -func newDecoratedWatcher(w watch.Interface, decorator func(runtime.Object)) *decoratedWatcher { - ctx, cancel := context.WithCancel(context.Background()) +func newDecoratedWatcher(ctx context.Context, w watch.Interface, decorator func(runtime.Object)) *decoratedWatcher { + ctx, cancel := context.WithCancel(ctx) d := &decoratedWatcher{ w: w, decorator: decorator, @@ -41,14 +41,18 @@ func newDecoratedWatcher(w watch.Interface, decorator func(runtime.Object)) *dec return d } +// run decorates watch events from the underlying watcher until its result channel +// is closed or the passed in context is done. +// When run() returns, decoratedWatcher#resultCh is closed. func (d *decoratedWatcher) run(ctx context.Context) { var recv, send watch.Event var ok bool + defer close(d.resultCh) for { select { case recv, ok = <-d.w.ResultChan(): - // The underlying channel may be closed after timeout. if !ok { + // The underlying channel was closed, cancel our context d.cancel() return } @@ -61,20 +65,24 @@ func (d *decoratedWatcher) run(ctx context.Context) { } select { case d.resultCh <- send: - if send.Type == watch.Error { - d.cancel() - } + // propagated event successfully case <-ctx.Done(): + // context timed out or was cancelled, stop the underlying watcher + d.w.Stop() + return } case <-ctx.Done(): + // context timed out or was cancelled, stop the underlying watcher d.w.Stop() - close(d.resultCh) return } } } func (d *decoratedWatcher) Stop() { + // stop the underlying watcher + d.w.Stop() + // cancel our context d.cancel() } diff --git a/pkg/registry/generic/registry/decorated_watcher_test.go b/pkg/registry/generic/registry/decorated_watcher_test.go index 33e47c8af..d6a3872d5 100644 --- a/pkg/registry/generic/registry/decorated_watcher_test.go +++ b/pkg/registry/generic/registry/decorated_watcher_test.go @@ -17,6 +17,7 @@ limitations under the License. package registry import ( + "context" "testing" "time" @@ -30,24 +31,78 @@ import ( func TestDecoratedWatcher(t *testing.T) { w := watch.NewFake() decorator := func(obj runtime.Object) { - pod := obj.(*example.Pod) - pod.Annotations = map[string]string{"decorated": "true"} + if pod, ok := obj.(*example.Pod); ok { + pod.Annotations = map[string]string{"decorated": "true"} + } } - dw := newDecoratedWatcher(w, decorator) + ctx, cancel := context.WithCancel(context.Background()) + dw := newDecoratedWatcher(ctx, w, decorator) defer dw.Stop() - go w.Add(&example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + go func() { + w.Error(&metav1.Status{Status: "Failure"}) + w.Add(&example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + w.Error(&metav1.Status{Status: "Failure"}) + w.Modify(&example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + w.Error(&metav1.Status{Status: "Failure"}) + w.Delete(&example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + }() + + expectErrorEvent(t, dw) // expect error is plumbed and doesn't force close the watcher + expectPodEvent(t, dw, watch.Added) + expectErrorEvent(t, dw) // expect error is plumbed and doesn't force close the watcher + expectPodEvent(t, dw, watch.Modified) + expectErrorEvent(t, dw) // expect error is plumbed and doesn't force close the watcher + expectPodEvent(t, dw, watch.Deleted) + + // cancel the passed-in context to simulate request timeout + cancel() + + // expect the decorated channel to be closed select { - case e := <-dw.ResultChan(): - pod, ok := e.Object.(*example.Pod) - if !ok { - t.Errorf("Should received object of type *api.Pod, get type (%T)", e.Object) - return - } - if pod.Annotations["decorated"] != "true" { - t.Errorf("pod.Annotations[\"decorated\"], want=%s, get=%s", "true", pod.Labels["decorated"]) + case e, ok := <-dw.ResultChan(): + if ok { + t.Errorf("expected result chan closed, got %#v", e) } case <-time.After(wait.ForeverTestTimeout): t.Errorf("timeout after %v", wait.ForeverTestTimeout) } + + // expect the underlying watcher to have been stopped as a result of the context cancellation + if !w.IsStopped() { + t.Errorf("expected underlying watcher to be stopped") + } +} + +func expectPodEvent(t *testing.T, dw *decoratedWatcher, watchType watch.EventType) { + select { + case e := <-dw.ResultChan(): + pod, ok := e.Object.(*example.Pod) + if !ok { + t.Fatalf("Should received object of type *api.Pod, get type (%T)", e.Object) + } + if pod.Annotations["decorated"] != "true" { + t.Fatalf("pod.Annotations[\"decorated\"], want=%s, get=%s", "true", pod.Labels["decorated"]) + } + if e.Type != watchType { + t.Fatalf("expected type %s, got %s", watchType, e.Type) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + } +} + +func expectErrorEvent(t *testing.T, dw *decoratedWatcher) { + select { + case e := <-dw.ResultChan(): + _, ok := e.Object.(*metav1.Status) + if !ok { + t.Fatalf("Should received object of type *metav1.Status, get type (%T)", e.Object) + } + if e.Type != watch.Error { + t.Fatalf("expected type %s, got %s", watch.Error, e.Type) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + } } diff --git a/pkg/registry/generic/registry/store.go b/pkg/registry/generic/registry/store.go index d40214f9d..0fe82ed0c 100644 --- a/pkg/registry/generic/registry/store.go +++ b/pkg/registry/generic/registry/store.go @@ -1226,7 +1226,7 @@ func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate return nil, err } if e.Decorator != nil { - return newDecoratedWatcher(w, e.Decorator), nil + return newDecoratedWatcher(ctx, w, e.Decorator), nil } return w, nil } @@ -1239,7 +1239,7 @@ func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate return nil, err } if e.Decorator != nil { - return newDecoratedWatcher(w, e.Decorator), nil + return newDecoratedWatcher(ctx, w, e.Decorator), nil } return w, nil }