Refactor waiting loop to use resourceVersion (#1301)

* WIP: Refactor waiting loop to use resourceVersion

* Remove testing println

* Fix error return in DeleteService

* Add more tests to reflect review feedback
This commit is contained in:
David Simansky 2021-05-17 19:17:25 +02:00 committed by GitHub
parent 66edd00fa8
commit 9f6ec3194f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 229 additions and 74 deletions

View File

@ -16,6 +16,7 @@ package v1
import ( import (
"context" "context"
"fmt"
"time" "time"
apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -241,25 +242,27 @@ func (c *knEventingClient) GetBroker(ctx context.Context, name string) (*eventin
} }
// WatchBroker is used to create watcher object // WatchBroker is used to create watcher object
func (c *knEventingClient) WatchBroker(ctx context.Context, name string, timeout time.Duration) (watch.Interface, error) { func (c *knEventingClient) WatchBroker(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcher(ctx, c.client.Brokers(c.namespace).Watch, c.client.RESTClient(), c.namespace, "brokers", name, timeout) return wait.NewWatcherWithVersion(ctx, c.client.Brokers(c.namespace).Watch, c.client.RESTClient(), c.namespace, "brokers", name, initialVersion, timeout)
} }
// DeleteBroker is used to delete an instance of broker and wait for completion until given timeout // DeleteBroker is used to delete an instance of broker and wait for completion until given timeout
// For `timeout == 0` delete is performed async without any wait // For `timeout == 0` delete is performed async without any wait
func (c *knEventingClient) DeleteBroker(ctx context.Context, name string, timeout time.Duration) error { func (c *knEventingClient) DeleteBroker(ctx context.Context, name string, timeout time.Duration) error {
broker, err := c.GetBroker(ctx, name)
if err != nil {
return err
}
if broker.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't delete broker '%s' because it has been already marked for deletion", name)
}
if timeout == 0 { if timeout == 0 {
return c.deleteBroker(ctx, name, apis_v1.DeletePropagationBackground) return c.deleteBroker(ctx, name, apis_v1.DeletePropagationBackground)
} }
waitC := make(chan error) waitC := make(chan error)
watcher, err := c.WatchBroker(ctx, name, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() { go func() {
waitForEvent := wait.NewWaitForEvent("broker", func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) waitForEvent := wait.NewWaitForEvent("broker", c.WatchBroker, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback()) err, _ := waitForEvent.Wait(ctx, name, broker.ResourceVersion, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err waitC <- err
}() }()
err = c.deleteBroker(ctx, name, apis_v1.DeletePropagationForeground) err = c.deleteBroker(ctx, name, apis_v1.DeletePropagationForeground)

View File

@ -245,13 +245,21 @@ func TestBrokerDelete(t *testing.T) {
var name = "fooBroker" var name = "fooBroker"
server, client := setup() server, client := setup()
server.AddReactor("get", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) {
name := a.(client_testing.GetAction).GetName()
if name == "notFound" {
return true, nil, errors.NewNotFound(eventingv1.Resource("broker"), "notFound")
}
return false, nil, nil
})
server.AddReactor("delete", "brokers", server.AddReactor("delete", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) { func(a client_testing.Action) (bool, runtime.Object, error) {
name := a.(client_testing.DeleteAction).GetName() name := a.(client_testing.DeleteAction).GetName()
if name == "errorBroker" { if name == "errorBroker" {
return true, nil, fmt.Errorf("error while deleting broker %s", name) return true, nil, fmt.Errorf("error while deleting broker %s", name)
} }
return true, nil, nil return false, nil, nil
}) })
err := client.DeleteBroker(context.Background(), name, 0) err := client.DeleteBroker(context.Background(), name, 0)
@ -259,6 +267,10 @@ func TestBrokerDelete(t *testing.T) {
err = client.DeleteBroker(context.Background(), "errorBroker", 0) err = client.DeleteBroker(context.Background(), "errorBroker", 0)
assert.ErrorContains(t, err, "errorBroker", 0) assert.ErrorContains(t, err, "errorBroker", 0)
err = client.DeleteBroker(context.Background(), "notFound", 0)
assert.ErrorContains(t, err, "not found", 0)
assert.ErrorContains(t, err, "notFound", 0)
} }
func TestBrokerDeleteWithWait(t *testing.T) { func TestBrokerDeleteWithWait(t *testing.T) {

View File

@ -197,12 +197,12 @@ func (cl *knServingClient) GetService(ctx context.Context, name string) (*servin
return service, nil return service, nil
} }
func (cl *knServingClient) WatchService(ctx context.Context, name string, timeout time.Duration) (watch.Interface, error) { func (cl *knServingClient) WatchServiceWithVersion(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcher(ctx, cl.client.Services(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "services", name, timeout) return wait.NewWatcherWithVersion(ctx, cl.client.Services(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "services", name, initialVersion, timeout)
} }
func (cl *knServingClient) WatchRevision(ctx context.Context, name string, timeout time.Duration) (watch.Interface, error) { func (cl *knServingClient) WatchRevisionWithVersion(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcher(ctx, cl.client.Revisions(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "revision", name, timeout) return wait.NewWatcherWithVersion(ctx, cl.client.Revisions(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "revision", name, initialVersion, timeout)
} }
// List services // List services
@ -314,24 +314,28 @@ func (cl *knServingClient) ApplyService(ctx context.Context, modifiedService *se
// Param `timeout` represents a duration to wait for a delete op to finish. // Param `timeout` represents a duration to wait for a delete op to finish.
// For `timeout == 0` delete is performed async without any wait. // For `timeout == 0` delete is performed async without any wait.
func (cl *knServingClient) DeleteService(ctx context.Context, serviceName string, timeout time.Duration) error { func (cl *knServingClient) DeleteService(ctx context.Context, serviceName string, timeout time.Duration) error {
service, err := cl.GetService(ctx, serviceName)
if err != nil {
return err
}
if service.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't delete service '%s' because it has been already marked for deletion", serviceName)
}
if timeout == 0 { if timeout == 0 {
return cl.deleteService(ctx, serviceName, v1.DeletePropagationBackground) return cl.deleteService(ctx, serviceName, v1.DeletePropagationBackground)
} }
waitC := make(chan error) waitC := make(chan error)
watcher, err := cl.WatchService(ctx, serviceName, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() { go func() {
waitForEvent := wait.NewWaitForEvent("service", func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) waitForEvent := wait.NewWaitForEvent("service", cl.WatchServiceWithVersion, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, watcher, serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback()) err, _ := waitForEvent.Wait(ctx, serviceName, service.ResourceVersion, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err waitC <- err
}() }()
err = cl.deleteService(ctx, serviceName, v1.DeletePropagationForeground) err = cl.deleteService(ctx, serviceName, v1.DeletePropagationForeground)
if err != nil { if err != nil {
return err return err
} }
return <-waitC return <-waitC
} }
@ -350,13 +354,16 @@ func (cl *knServingClient) deleteService(ctx context.Context, serviceName string
// Wait for a service to become ready, but not longer than provided timeout // Wait for a service to become ready, but not longer than provided timeout
func (cl *knServingClient) WaitForService(ctx context.Context, name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) { func (cl *knServingClient) WaitForService(ctx context.Context, name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) {
watcher, err := cl.WatchService(ctx, name, timeout) waitForReady := wait.NewWaitForReady("service", cl.WatchServiceWithVersion, serviceConditionExtractor)
service, err := cl.GetService(ctx, name)
if err != nil { if err != nil {
return err, timeout if apierrors.IsNotFound(err) {
return waitForReady.Wait(ctx, name, "", wait.Options{Timeout: &timeout}, msgCallback)
}
return err, 0
} }
defer watcher.Stop() return waitForReady.Wait(ctx, name, service.ResourceVersion, wait.Options{Timeout: &timeout}, msgCallback)
waitForReady := wait.NewWaitForReady("service", serviceConditionExtractor)
return waitForReady.Wait(ctx, watcher, name, wait.Options{Timeout: &timeout}, msgCallback)
} }
// Get the configuration for a service // Get the configuration for a service
@ -469,14 +476,9 @@ func (cl *knServingClient) DeleteRevision(ctx context.Context, name string, time
return cl.deleteRevision(ctx, name) return cl.deleteRevision(ctx, name)
} }
waitC := make(chan error) waitC := make(chan error)
watcher, err := cl.WatchRevision(ctx, name, timeout)
if err != nil {
return err
}
defer watcher.Stop()
go func() { go func() {
waitForEvent := wait.NewWaitForEvent("revision", func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) waitForEvent := wait.NewWaitForEvent("revision", cl.WatchRevisionWithVersion, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback()) err, _ := waitForEvent.Wait(ctx, name, revision.ResourceVersion, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err waitC <- err
}() }()
err = cl.deleteRevision(ctx, name) err = cl.deleteRevision(ctx, name)

View File

@ -192,6 +192,15 @@ func TestDeleteService(t *testing.T) {
nonExistingServiceName = "no-service" nonExistingServiceName = "no-service"
) )
serving.AddReactor("get", "services",
func(a clienttesting.Action) (bool, runtime.Object, error) {
name := a.(clienttesting.GetAction).GetName()
if name == serviceName {
// Don't handle existing service, just continue to next
return false, nil, nil
}
return true, nil, errors.NewNotFound(servingv1.Resource("service"), name)
})
serving.AddReactor("delete", "services", serving.AddReactor("delete", "services",
func(a clienttesting.Action) (bool, runtime.Object, error) { func(a clienttesting.Action) (bool, runtime.Object, error) {
name := a.(clienttesting.DeleteAction).GetName() name := a.(clienttesting.DeleteAction).GetName()
@ -201,7 +210,7 @@ func TestDeleteService(t *testing.T) {
if name == serviceName { if name == serviceName {
return true, nil, nil return true, nil, nil
} }
return true, nil, errors.NewNotFound(servingv1.Resource("service"), name) return false, nil, nil
}) })
serving.AddWatchReactor("services", serving.AddWatchReactor("services",
func(a clienttesting.Action) (bool, watch.Interface, error) { func(a clienttesting.Action) (bool, watch.Interface, error) {
@ -222,6 +231,7 @@ func TestDeleteService(t *testing.T) {
t.Run("trying to delete non-existing service returns error", func(t *testing.T) { t.Run("trying to delete non-existing service returns error", func(t *testing.T) {
err := client.DeleteService(context.Background(), nonExistingServiceName, time.Duration(10)*time.Second) err := client.DeleteService(context.Background(), nonExistingServiceName, time.Duration(10)*time.Second)
println(err.Error())
assert.ErrorContains(t, err, "not found") assert.ErrorContains(t, err, "not found")
assert.ErrorContains(t, err, nonExistingServiceName) assert.ErrorContains(t, err, nonExistingServiceName)
}) })

View File

@ -69,16 +69,16 @@ func newTickerPollInterval(d time.Duration) *tickerPollInterval {
return &tickerPollInterval{time.NewTicker(d)} return &tickerPollInterval{time.NewTicker(d)}
} }
// NewWatcher makes a watch.Interface on the given resource in the client, // NewWatcherWithVersion makes a watch.Interface on the given resource in the client,
// falling back to polling if the server does not support Watch. // falling back to polling if the server does not support Watch.
func NewWatcher(ctx context.Context, watchFunc watchF, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { func NewWatcherWithVersion(ctx context.Context, watchFunc watchF, c rest.Interface, ns string, resource string, name string, initialResourceVersion string, timeout time.Duration) (watch.Interface, error) {
native, err := nativeWatch(ctx, watchFunc, name, timeout) native, err := nativeWatchWithVersion(ctx, watchFunc, name, initialResourceVersion, timeout)
if err == nil { if err == nil {
return native, nil return native, nil
} }
polling := &pollingWatcher{ polling := &pollingWatcher{
c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{}, c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{},
newTickerPollInterval(pollInterval), nativePoll(ctx, c, ns, resource, name)} newTickerPollInterval(time.Second), nativePoll(ctx, c, ns, resource, name)}
polling.start() polling.start()
return polling, nil return polling, nil
} }
@ -161,9 +161,10 @@ func (w *pollingWatcher) Stop() {
close(w.done) close(w.done)
} }
func nativeWatch(ctx context.Context, watchFunc watchF, name string, timeout time.Duration) (watch.Interface, error) { func nativeWatchWithVersion(ctx context.Context, watchFunc watchF, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
opts := v1.ListOptions{ opts := v1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), ResourceVersion: initialVersion,
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
} }
opts.Watch = true opts.Watch = true
addWatchTimeout(&opts, timeout) addWatchTimeout(&opts, timeout)

View File

@ -28,14 +28,16 @@ import (
// Callbacks and configuration used while waiting // Callbacks and configuration used while waiting
type waitForReadyConfig struct { type waitForReadyConfig struct {
watchMaker WatchMaker
conditionsExtractor ConditionsExtractor conditionsExtractor ConditionsExtractor
kind string kind string
} }
// Callbacks and configuration used while waiting for event // Callbacks and configuration used while waiting for event
type waitForEvent struct { type waitForEvent struct {
eventDone EventDone watchMaker WatchMaker
kind string eventDone EventDone
kind string
} }
// EventDone is a marker to stop actual waiting on given event state // EventDone is a marker to stop actual waiting on given event state
@ -48,7 +50,7 @@ type Wait interface {
// Wait on resource the resource with this name // Wait on resource the resource with this name
// and write event messages for unknown event to the status writer. // and write event messages for unknown event to the status writer.
// Returns an error (if any) and the overall time it took to wait // Returns an error (if any) and the overall time it took to wait
Wait(ctx context.Context, watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) Wait(ctx context.Context, name string, initialVersion string, options Options, msgCallback MessageCallback) (error, time.Duration)
} }
type Options struct { type Options struct {
@ -61,7 +63,7 @@ type Options struct {
} }
// Create watch which is used when waiting for Ready condition // Create watch which is used when waiting for Ready condition
type WatchMaker func(name string, timeout time.Duration) (watch.Interface, error) type WatchMaker func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error)
// Extract conditions from a runtime object // Extract conditions from a runtime object
type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error) type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error)
@ -70,19 +72,21 @@ type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error)
type MessageCallback func(durationSinceState time.Duration, message string) type MessageCallback func(durationSinceState time.Duration, message string)
// NewWaitForReady waits until the condition is set to Ready == True // NewWaitForReady waits until the condition is set to Ready == True
func NewWaitForReady(kind string, extractor ConditionsExtractor) Wait { func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExtractor) Wait {
return &waitForReadyConfig{ return &waitForReadyConfig{
kind: kind, kind: kind,
watchMaker: watchMaker,
conditionsExtractor: extractor, conditionsExtractor: extractor,
} }
} }
// NewWaitForEvent creates a Wait object which waits until a specific event (i.e. when // NewWaitForEvent creates a Wait object which waits until a specific event (i.e. when
// the EventDone function returns true) // the EventDone function returns true)
func NewWaitForEvent(kind string, eventDone EventDone) Wait { func NewWaitForEvent(kind string, watchMaker WatchMaker, eventDone EventDone) Wait {
return &waitForEvent{ return &waitForEvent{
kind: kind, kind: kind,
eventDone: eventDone, watchMaker: watchMaker,
eventDone: eventDone,
} }
} }
@ -109,14 +113,13 @@ func NoopMessageCallback() MessageCallback {
// (e.g. "service"), `timeout` is a timeout after which the watch should be cancelled if no // (e.g. "service"), `timeout` is a timeout after which the watch should be cancelled if no
// target state has been entered yet and `out` is used for printing out status messages // target state has been entered yet and `out` is used for printing out status messages
// msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message. // msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message.
func (w *waitForReadyConfig) Wait(ctx context.Context, watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) { func (w *waitForReadyConfig) Wait(ctx context.Context, name string, initialVersion string, options Options, msgCallback MessageCallback) (error, time.Duration) {
timeout := options.timeoutWithDefault() timeout := options.timeoutWithDefault()
timeoutTimer := time.NewTimer(timeout) timeoutTimer := time.NewTimer(timeout)
defer timeoutTimer.Stop() defer timeoutTimer.Stop()
for { for {
start := time.Now() start := time.Now()
retry, timeoutReached, err := w.waitForReadyCondition(ctx, watcher, start, timeoutTimer, options.errorWindowWithDefault(), msgCallback) retry, timeoutReached, err := w.waitForReadyCondition(ctx, name, initialVersion, start, timeoutTimer, options.errorWindowWithDefault(), options, msgCallback)
if err != nil { if err != nil {
return err, time.Since(start) return err, time.Since(start)
@ -140,10 +143,14 @@ func (w *waitForReadyConfig) Wait(ctx context.Context, watcher watch.Interface,
// An errorWindow can be specified which takes into account of intermediate "false" ready conditions. So before returning // An errorWindow can be specified which takes into account of intermediate "false" ready conditions. So before returning
// an error, this methods waits for the errorWindow duration and if an "True" or "Unknown" event arrives in the meantime // an error, this methods waits for the errorWindow duration and if an "True" or "Unknown" event arrives in the meantime
// for the "Ready" condition, then the method continues to wait. // for the "Ready" condition, then the method continues to wait.
func (w *waitForReadyConfig) waitForReadyCondition( func (w *waitForReadyConfig) waitForReadyCondition(ctx context.Context, name string, initialVersion string, start time.Time,
ctx context.Context, watcher watch.Interface, start time.Time, timeoutTimer *time.Timer, errorWindow time.Duration, msgCallback MessageCallback, timeoutTimer *time.Timer, errorWindow time.Duration, options Options, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) {
) (retry bool, timeoutReached bool, err error) {
watcher, err := w.watchMaker(ctx, name, initialVersion, options.timeoutWithDefault())
if err != nil {
return false, false, err
}
defer watcher.Stop()
// channel used to transport the error that has been received // channel used to transport the error that has been received
errChan := make(chan error) errChan := make(chan error)
@ -170,8 +177,7 @@ func (w *waitForReadyConfig) waitForReadyCondition(
return false, false, err return false, false, err
case event, ok := <-watcher.ResultChan(): case event, ok := <-watcher.ResultChan():
if !ok || event.Object == nil { if !ok || event.Object == nil {
// retry only if the channel is still open return true, false, nil
return ok, false, nil
} }
// Check whether resource is in sync already (meta.generation == status.observedGeneration) // Check whether resource is in sync already (meta.generation == status.observedGeneration)
@ -240,7 +246,13 @@ func (w *waitForReadyConfig) waitForReadyCondition(
} }
// Wait until the expected EventDone is satisfied // Wait until the expected EventDone is satisfied
func (w *waitForEvent) Wait(ctx context.Context, watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) { func (w *waitForEvent) Wait(ctx context.Context, name string, initialVersion string, options Options, msgCallback MessageCallback) (error, time.Duration) {
watcher, err := w.watchMaker(ctx, name, initialVersion, options.timeoutWithDefault())
if err != nil {
return err, 0
}
defer watcher.Stop()
timeout := options.timeoutWithDefault() timeout := options.timeoutWithDefault()
start := time.Now() start := time.Now()
// channel used to transport the error // channel used to transport the error

View File

@ -17,6 +17,7 @@ package wait
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"testing" "testing"
"time" "time"
@ -39,9 +40,13 @@ type waitForReadyTestCase struct {
func TestWaitCancellation(t *testing.T) { func TestWaitCancellation(t *testing.T) {
fakeWatchApi := NewFakeWatch([]watch.Event{}) fakeWatchApi := NewFakeWatch([]watch.Event{})
fakeWatchApi.Start() fakeWatchApi.Start()
wfe := NewWaitForEvent("foobar", func(e *watch.Event) bool { wfe := NewWaitForEvent("foobar",
return false func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
}) return fakeWatchApi, nil
},
func(e *watch.Event) bool {
return false
})
timeout := time.Second * 5 timeout := time.Second * 5
@ -51,7 +56,7 @@ func TestWaitCancellation(t *testing.T) {
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
cancel() cancel()
}() }()
err, _ := wfe.Wait(ctx, fakeWatchApi, "foobar", Options{Timeout: &timeout}, NoopMessageCallback()) err, _ := wfe.Wait(ctx, "foobar", "", Options{Timeout: &timeout}, NoopMessageCallback())
assert.Assert(t, errors.Is(err, context.Canceled)) assert.Assert(t, errors.Is(err, context.Canceled))
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
@ -62,10 +67,13 @@ func TestWaitCancellation(t *testing.T) {
}() }()
wfr := NewWaitForReady( wfr := NewWaitForReady(
"blub", "blub",
func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchApi, nil
},
func(obj runtime.Object) (apis.Conditions, error) { func(obj runtime.Object) (apis.Conditions, error) {
return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil
}) })
err, _ = wfr.Wait(ctx, fakeWatchApi, "foobar", Options{Timeout: &timeout}, NoopMessageCallback()) err, _ = wfr.Wait(ctx, "foobar", "", Options{Timeout: &timeout}, NoopMessageCallback())
assert.Assert(t, errors.Is(err, context.Canceled)) assert.Assert(t, errors.Is(err, context.Canceled))
} }
@ -73,15 +81,17 @@ func TestAddWaitForReady(t *testing.T) {
for i, tc := range prepareTestCases("test-service") { for i, tc := range prepareTestCases("test-service") {
fakeWatchApi := NewFakeWatch(tc.events) fakeWatchApi := NewFakeWatch(tc.events)
waitForReady := NewWaitForReady( waitForReady := NewWaitForReady(
"blub", "blub",
func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchApi, nil
},
func(obj runtime.Object) (apis.Conditions, error) { func(obj runtime.Object) (apis.Conditions, error) {
return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil
}) })
fakeWatchApi.Start() fakeWatchApi.Start()
var msgs []string var msgs []string
err, _ := waitForReady.Wait(context.Background(), fakeWatchApi, "foobar", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) { err, _ := waitForReady.Wait(context.Background(), "foobar", "", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) {
msgs = append(msgs, msg) msgs = append(msgs, msg)
}) })
close(fakeWatchApi.eventChan) close(fakeWatchApi.eventChan)
@ -101,23 +111,128 @@ func TestAddWaitForReady(t *testing.T) {
// check messages // check messages
assert.Assert(t, cmp.DeepEqual(tc.messagesExpected, msgs), "%d: Messages expected to be equal", i) assert.Assert(t, cmp.DeepEqual(tc.messagesExpected, msgs), "%d: Messages expected to be equal", i)
if fakeWatchApi.StopCalled != 0 { if fakeWatchApi.StopCalled != 1 {
t.Errorf("%d: Exactly zero 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled) t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled)
} }
} }
} }
func TestAddWaitForReadyWithChannelClose(t *testing.T) {
for i, tc := range prepareTestCases("test-service") {
fakeWatchApi := NewFakeWatch(tc.events)
counter := 0
waitForReady := NewWaitForReady(
"blub",
func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
if counter == 0 {
close(fakeWatchApi.eventChan)
counter++
return fakeWatchApi, nil
}
fakeWatchApi.eventChan = make(chan watch.Event)
fakeWatchApi.Start()
return fakeWatchApi, nil
},
func(obj runtime.Object) (apis.Conditions, error) {
return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil
})
var msgs []string
err, _ := waitForReady.Wait(context.Background(), "foobar", "", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) {
msgs = append(msgs, msg)
})
close(fakeWatchApi.eventChan)
if tc.errorText == "" && err != nil {
t.Errorf("%d: Error received %v", i, err)
continue
}
if tc.errorText != "" {
if err == nil {
t.Errorf("%d: No error but expected one", i)
} else {
assert.ErrorContains(t, err, tc.errorText)
}
}
// check messages
assert.Assert(t, cmp.DeepEqual(tc.messagesExpected, msgs), "%d: Messages expected to be equal", i)
if fakeWatchApi.StopCalled != 2 {
t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled)
}
}
}
func TestWaitTimeout(t *testing.T) {
fakeWatchApi := NewFakeWatch([]watch.Event{})
timeout := time.Second * 3
wfe := NewWaitForEvent("foobar",
func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchApi, nil
},
func(e *watch.Event) bool {
return false
})
err, _ := wfe.Wait(context.Background(), "foobar", "", Options{Timeout: &timeout}, NoopMessageCallback())
assert.ErrorContains(t, err, "not ready")
assert.Assert(t, fakeWatchApi.StopCalled == 1)
fakeWatchApi = NewFakeWatch([]watch.Event{})
wfr := NewWaitForReady(
"blub",
func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchApi, nil
},
func(obj runtime.Object) (apis.Conditions, error) {
return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil
})
err, _ = wfr.Wait(context.Background(), "foobar", "", Options{Timeout: &timeout}, NoopMessageCallback())
assert.ErrorContains(t, err, "not ready")
assert.Assert(t, fakeWatchApi.StopCalled == 1)
}
func TestWaitWatchError(t *testing.T) {
timeout := time.Second * 3
wfe := NewWaitForEvent("foobar",
func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return nil, fmt.Errorf("error creating watcher")
},
func(e *watch.Event) bool {
return false
})
err, _ := wfe.Wait(context.Background(), "foobar", "", Options{Timeout: &timeout}, NoopMessageCallback())
assert.ErrorContains(t, err, "error creating watcher")
wfr := NewWaitForReady(
"blub",
func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return nil, fmt.Errorf("error creating watcher")
},
func(obj runtime.Object) (apis.Conditions, error) {
return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil
})
err, _ = wfr.Wait(context.Background(), "foobar", "", Options{Timeout: &timeout}, NoopMessageCallback())
assert.ErrorContains(t, err, "error creating watcher")
}
func TestAddWaitForDelete(t *testing.T) { func TestAddWaitForDelete(t *testing.T) {
for i, tc := range prepareDeleteTestCases("test-service") { for i, tc := range prepareDeleteTestCases("test-service") {
fakeWatchAPI := NewFakeWatch(tc.events) fakeWatchAPI := NewFakeWatch(tc.events)
waitForEvent := NewWaitForEvent( waitForEvent := NewWaitForEvent(
"blub", "blub",
func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchAPI, nil
},
func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
fakeWatchAPI.Start() fakeWatchAPI.Start()
err, _ := waitForEvent.Wait(context.Background(), fakeWatchAPI, "foobar", Options{Timeout: &tc.timeout}, NoopMessageCallback()) err, _ := waitForEvent.Wait(context.Background(), "foobar", "", Options{Timeout: &tc.timeout}, NoopMessageCallback())
close(fakeWatchAPI.eventChan) close(fakeWatchAPI.eventChan)
if tc.errorText == "" && err != nil { if tc.errorText == "" && err != nil {
@ -132,8 +247,8 @@ func TestAddWaitForDelete(t *testing.T) {
} }
} }
if fakeWatchAPI.StopCalled != 0 { if fakeWatchAPI.StopCalled != 1 {
t.Errorf("%d: Exactly zero 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled) t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled)
} }
} }
} }
@ -142,10 +257,10 @@ func TestAddWaitForDelete(t *testing.T) {
func prepareTestCases(name string) []waitForReadyTestCase { func prepareTestCases(name string) []waitForReadyTestCase {
return []waitForReadyTestCase{ return []waitForReadyTestCase{
errorTest(name), errorTest(name),
tc(peNormal, name, time.Second, ""), tc(peNormal, name, 5*time.Second, ""),
tc(peWrongGeneration, name, 1*time.Second, "timeout"), tc(peWrongGeneration, name, 5*time.Second, "timeout"),
tc(peTimeout, name, time.Second, "timeout"), tc(peTimeout, name, 5*time.Second, "timeout"),
tc(peReadyFalseWithinErrorWindow, name, time.Second, ""), tc(peReadyFalseWithinErrorWindow, name, 5*time.Second, ""),
} }
} }
@ -164,7 +279,7 @@ func errorTest(name string) waitForReadyTestCase {
return waitForReadyTestCase{ return waitForReadyTestCase{
events: events, events: events,
timeout: 3 * time.Second, timeout: 5 * time.Second,
errorText: "FakeError", errorText: "FakeError",
messagesExpected: []string{"msg1", "Test Error"}, messagesExpected: []string{"msg1", "Test Error"},
} }