Fix closing of decorated watcher channel on timeout
Kubernetes-commit: bd2d63dd57e6011bfa3218e59e27fddaa295426b
This commit is contained in:
parent
1ee6a1337f
commit
584c18160b
|
@ -29,8 +29,8 @@ type decoratedWatcher struct {
|
|||
resultCh chan watch.Event
|
||||
}
|
||||
|
||||
func newDecoratedWatcher(w watch.Interface, decorator func(runtime.Object)) *decoratedWatcher {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
func newDecoratedWatcher(ctx context.Context, w watch.Interface, decorator func(runtime.Object)) *decoratedWatcher {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
d := &decoratedWatcher{
|
||||
w: w,
|
||||
decorator: decorator,
|
||||
|
@ -41,14 +41,18 @@ func newDecoratedWatcher(w watch.Interface, decorator func(runtime.Object)) *dec
|
|||
return d
|
||||
}
|
||||
|
||||
// run decorates watch events from the underlying watcher until its result channel
|
||||
// is closed or the passed in context is done.
|
||||
// When run() returns, decoratedWatcher#resultCh is closed.
|
||||
func (d *decoratedWatcher) run(ctx context.Context) {
|
||||
var recv, send watch.Event
|
||||
var ok bool
|
||||
defer close(d.resultCh)
|
||||
for {
|
||||
select {
|
||||
case recv, ok = <-d.w.ResultChan():
|
||||
// The underlying channel may be closed after timeout.
|
||||
if !ok {
|
||||
// The underlying channel was closed, cancel our context
|
||||
d.cancel()
|
||||
return
|
||||
}
|
||||
|
@ -61,20 +65,24 @@ func (d *decoratedWatcher) run(ctx context.Context) {
|
|||
}
|
||||
select {
|
||||
case d.resultCh <- send:
|
||||
if send.Type == watch.Error {
|
||||
d.cancel()
|
||||
}
|
||||
// propagated event successfully
|
||||
case <-ctx.Done():
|
||||
// context timed out or was cancelled, stop the underlying watcher
|
||||
d.w.Stop()
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
// context timed out or was cancelled, stop the underlying watcher
|
||||
d.w.Stop()
|
||||
close(d.resultCh)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *decoratedWatcher) Stop() {
|
||||
// stop the underlying watcher
|
||||
d.w.Stop()
|
||||
// cancel our context
|
||||
d.cancel()
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -30,24 +31,78 @@ import (
|
|||
func TestDecoratedWatcher(t *testing.T) {
|
||||
w := watch.NewFake()
|
||||
decorator := func(obj runtime.Object) {
|
||||
pod := obj.(*example.Pod)
|
||||
pod.Annotations = map[string]string{"decorated": "true"}
|
||||
if pod, ok := obj.(*example.Pod); ok {
|
||||
pod.Annotations = map[string]string{"decorated": "true"}
|
||||
}
|
||||
}
|
||||
dw := newDecoratedWatcher(w, decorator)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
dw := newDecoratedWatcher(ctx, w, decorator)
|
||||
defer dw.Stop()
|
||||
|
||||
go w.Add(&example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
go func() {
|
||||
w.Error(&metav1.Status{Status: "Failure"})
|
||||
w.Add(&example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
w.Error(&metav1.Status{Status: "Failure"})
|
||||
w.Modify(&example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
w.Error(&metav1.Status{Status: "Failure"})
|
||||
w.Delete(&example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
}()
|
||||
|
||||
expectErrorEvent(t, dw) // expect error is plumbed and doesn't force close the watcher
|
||||
expectPodEvent(t, dw, watch.Added)
|
||||
expectErrorEvent(t, dw) // expect error is plumbed and doesn't force close the watcher
|
||||
expectPodEvent(t, dw, watch.Modified)
|
||||
expectErrorEvent(t, dw) // expect error is plumbed and doesn't force close the watcher
|
||||
expectPodEvent(t, dw, watch.Deleted)
|
||||
|
||||
// cancel the passed-in context to simulate request timeout
|
||||
cancel()
|
||||
|
||||
// expect the decorated channel to be closed
|
||||
select {
|
||||
case e := <-dw.ResultChan():
|
||||
pod, ok := e.Object.(*example.Pod)
|
||||
if !ok {
|
||||
t.Errorf("Should received object of type *api.Pod, get type (%T)", e.Object)
|
||||
return
|
||||
}
|
||||
if pod.Annotations["decorated"] != "true" {
|
||||
t.Errorf("pod.Annotations[\"decorated\"], want=%s, get=%s", "true", pod.Labels["decorated"])
|
||||
case e, ok := <-dw.ResultChan():
|
||||
if ok {
|
||||
t.Errorf("expected result chan closed, got %#v", e)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("timeout after %v", wait.ForeverTestTimeout)
|
||||
}
|
||||
|
||||
// expect the underlying watcher to have been stopped as a result of the context cancellation
|
||||
if !w.IsStopped() {
|
||||
t.Errorf("expected underlying watcher to be stopped")
|
||||
}
|
||||
}
|
||||
|
||||
func expectPodEvent(t *testing.T, dw *decoratedWatcher, watchType watch.EventType) {
|
||||
select {
|
||||
case e := <-dw.ResultChan():
|
||||
pod, ok := e.Object.(*example.Pod)
|
||||
if !ok {
|
||||
t.Fatalf("Should received object of type *api.Pod, get type (%T)", e.Object)
|
||||
}
|
||||
if pod.Annotations["decorated"] != "true" {
|
||||
t.Fatalf("pod.Annotations[\"decorated\"], want=%s, get=%s", "true", pod.Labels["decorated"])
|
||||
}
|
||||
if e.Type != watchType {
|
||||
t.Fatalf("expected type %s, got %s", watchType, e.Type)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
func expectErrorEvent(t *testing.T, dw *decoratedWatcher) {
|
||||
select {
|
||||
case e := <-dw.ResultChan():
|
||||
_, ok := e.Object.(*metav1.Status)
|
||||
if !ok {
|
||||
t.Fatalf("Should received object of type *metav1.Status, get type (%T)", e.Object)
|
||||
}
|
||||
if e.Type != watch.Error {
|
||||
t.Fatalf("expected type %s, got %s", watch.Error, e.Type)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1226,7 +1226,7 @@ func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate
|
|||
return nil, err
|
||||
}
|
||||
if e.Decorator != nil {
|
||||
return newDecoratedWatcher(w, e.Decorator), nil
|
||||
return newDecoratedWatcher(ctx, w, e.Decorator), nil
|
||||
}
|
||||
return w, nil
|
||||
}
|
||||
|
@ -1239,7 +1239,7 @@ func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate
|
|||
return nil, err
|
||||
}
|
||||
if e.Decorator != nil {
|
||||
return newDecoratedWatcher(w, e.Decorator), nil
|
||||
return newDecoratedWatcher(ctx, w, e.Decorator), nil
|
||||
}
|
||||
return w, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue