Fix race conditions when creating watches (#1113)

* Fix a race condition between creating a watch and initiating the action that emits the event it is watching for

* update changelog

* add PR ID to changelog entry

* Fix merge in Changelog

* Fix table format in Changelog
This commit is contained in:
Andras Kerekes 2020-11-25 01:32:35 -08:00 committed by GitHub
parent 861f63d406
commit 83b8635d85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 52 additions and 49 deletions

View File

@ -17,6 +17,10 @@
|===
| | Description | PR
| 🐛
| Fix a race condition when using Kubernetes watches
| https://github.com/knative/client/pull/1113[#1113]
| 🐛
| Embed the namespace in request body while creating channels
| https://github.com/knative/client/pull/1117[#1117]

View File

@ -253,12 +253,17 @@ func (c *knEventingClient) DeleteBroker(name string, timeout time.Duration) erro
return c.deleteBroker(name, apis_v1.DeletePropagationBackground)
}
waitC := make(chan error)
watcher, err := c.WatchBroker(name, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("broker", c.WatchBroker, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("broker", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err := c.deleteBroker(name, apis_v1.DeletePropagationForeground)
err = c.deleteBroker(name, apis_v1.DeletePropagationForeground)
if err != nil {
return err
}

View File

@ -319,12 +319,17 @@ func (cl *knServingClient) DeleteService(serviceName string, timeout time.Durati
return cl.deleteService(serviceName, v1.DeletePropagationBackground)
}
waitC := make(chan error)
watcher, err := cl.WatchService(serviceName, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("service", cl.WatchService, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("service", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err := cl.deleteService(serviceName, v1.DeletePropagationForeground)
err = cl.deleteService(serviceName, v1.DeletePropagationForeground)
if err != nil {
return err
}
@ -346,8 +351,13 @@ func (cl *knServingClient) deleteService(serviceName string, propagationPolicy v
// Wait for a service to become ready, but not longer than provided timeout
func (cl *knServingClient) WaitForService(name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) {
waitForReady := wait.NewWaitForReady("service", cl.WatchService, serviceConditionExtractor)
return waitForReady.Wait(name, wait.Options{Timeout: &timeout}, msgCallback)
watcher, err := cl.WatchService(name, timeout)
if err != nil {
return err, timeout
}
defer watcher.Stop()
waitForReady := wait.NewWaitForReady("service", serviceConditionExtractor)
return waitForReady.Wait(watcher, name, wait.Options{Timeout: &timeout}, msgCallback)
}
// Get the configuration for a service
@ -460,9 +470,14 @@ func (cl *knServingClient) DeleteRevision(name string, timeout time.Duration) er
return cl.deleteRevision(name)
}
waitC := make(chan error)
watcher, err := cl.WatchRevision(name, timeout)
if err != nil {
return err
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("revision", cl.WatchRevision, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("revision", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err = cl.deleteRevision(name)

View File

@ -27,14 +27,12 @@ import (
// Callbacks and configuration used while waiting
type waitForReadyConfig struct {
watchMaker WatchMaker
conditionsExtractor ConditionsExtractor
kind string
}
// Callbacks and configuration used while waiting for event
type waitForEvent struct {
watchMaker WatchMaker
eventDone EventDone
kind string
}
@ -49,7 +47,7 @@ type Wait interface {
// Wait on resource the resource with this name
// and write event messages for unknown event to the status writer.
// Returns an error (if any) and the overall time it took to wait
Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration)
Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration)
}
type Options struct {
@ -71,20 +69,18 @@ type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error)
type MessageCallback func(durationSinceState time.Duration, message string)
// NewWaitForReady waits until the condition is set to Ready == True
func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExtractor) Wait {
func NewWaitForReady(kind string, extractor ConditionsExtractor) Wait {
return &waitForReadyConfig{
kind: kind,
watchMaker: watchMaker,
conditionsExtractor: extractor,
}
}
// NewWaitForEvent creates a Wait object which waits until a specific event (i.e. when
// the EventDone function returns true)
func NewWaitForEvent(kind string, watchMaker WatchMaker, eventDone EventDone) Wait {
func NewWaitForEvent(kind string, eventDone EventDone) Wait {
return &waitForEvent{
kind: kind,
watchMaker: watchMaker,
eventDone: eventDone,
}
}
@ -112,13 +108,13 @@ func NoopMessageCallback() MessageCallback {
// (e.g. "service"), `timeout` is a timeout after which the watch should be cancelled if no
// target state has been entered yet and `out` is used for printing out status messages
// msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message.
func (w *waitForReadyConfig) Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
func (w *waitForReadyConfig) Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
timeout := options.timeoutWithDefault()
floatingTimeout := timeout
for {
start := time.Now()
retry, timeoutReached, err := w.waitForReadyCondition(start, name, floatingTimeout, options.errorWindowWithDefault(), msgCallback)
retry, timeoutReached, err := w.waitForReadyCondition(watcher, start, name, floatingTimeout, options.errorWindowWithDefault(), msgCallback)
if err != nil {
return err, time.Since(start)
}
@ -141,13 +137,7 @@ func (w *waitForReadyConfig) Wait(name string, options Options, msgCallback Mess
// An errorWindow can be specified which takes into account of intermediate "false" ready conditions. So before returning
// an error, this methods waits for the errorWindow duration and if an "True" or "Unknown" event arrives in the meantime
// for the "Ready" condition, then the method continues to wait.
func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string, timeout time.Duration, errorWindow time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) {
watcher, err := w.watchMaker(name, timeout)
if err != nil {
return false, false, err
}
defer watcher.Stop()
func (w *waitForReadyConfig) waitForReadyCondition(watcher watch.Interface, start time.Time, name string, timeout time.Duration, errorWindow time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) {
// channel used to transport the error that has been received
errChan := make(chan error)
@ -239,13 +229,8 @@ func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string,
}
// Wait until the expected EventDone is satisfied
func (w *waitForEvent) Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
func (w *waitForEvent) Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
timeout := options.timeoutWithDefault()
watcher, err := w.watchMaker(name, timeout)
if err != nil {
return err, 0
}
defer watcher.Stop()
start := time.Now()
// channel used to transport the error
errChan := make(chan error)
@ -255,7 +240,7 @@ func (w *waitForEvent) Wait(name string, options Options, msgCallback MessageCal
select {
case <-timer.C:
return fmt.Errorf("timeout: %s '%s' not ready after %d seconds", w.kind, name, int(timeout/time.Second)), time.Since(start)
case err = <-errChan:
case err := <-errChan:
return err, time.Since(start)
case event := <-watcher.ResultChan():
if w.eventDone(&event) {

View File

@ -41,15 +41,12 @@ func TestAddWaitForReady(t *testing.T) {
waitForReady := NewWaitForReady(
"blub",
func(name string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchApi, nil
},
func(obj runtime.Object) (apis.Conditions, error) {
return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil
})
fakeWatchApi.Start()
var msgs []string
err, _ := waitForReady.Wait("foobar", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) {
err, _ := waitForReady.Wait(fakeWatchApi, "foobar", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) {
msgs = append(msgs, msg)
})
close(fakeWatchApi.eventChan)
@ -69,8 +66,8 @@ func TestAddWaitForReady(t *testing.T) {
// check messages
assert.Assert(t, cmp.DeepEqual(tc.messagesExpected, msgs), "%d: Messages expected to be equal", i)
if fakeWatchApi.StopCalled != 1 {
t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled)
if fakeWatchApi.StopCalled != 0 {
t.Errorf("%d: Exactly zero 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled)
}
}
@ -82,13 +79,10 @@ func TestAddWaitForDelete(t *testing.T) {
waitForEvent := NewWaitForEvent(
"blub",
func(name string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchAPI, nil
},
func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
fakeWatchAPI.Start()
err, _ := waitForEvent.Wait("foobar", Options{Timeout: &tc.timeout}, NoopMessageCallback())
err, _ := waitForEvent.Wait(fakeWatchAPI, "foobar", Options{Timeout: &tc.timeout}, NoopMessageCallback())
close(fakeWatchAPI.eventChan)
if tc.errorText == "" && err != nil {
@ -103,8 +97,8 @@ func TestAddWaitForDelete(t *testing.T) {
}
}
if fakeWatchAPI.StopCalled != 1 {
t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled)
if fakeWatchAPI.StopCalled != 0 {
t.Errorf("%d: Exactly zero 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled)
}
}
}