mirror of https://github.com/fluxcd/cli-utils.git
Change the behavior of hanlding a WaitTask timeout
Today, when a WaitTask timeout happens, the WaitTask sends the TimeoutError on the TaskChannel. After receiving the TimeoutError, `baseRunner.run` terminates immediately by returning the error to its caller (Applier.Run or Destroyer.Run). The caller then sends the error onto the EventChannel and terminates. With this PR, when a WaitTask timeout happens, the WaitTask sends a WaitType Event including the TimeoutError on the EventChannel, and then sends an empty TaskResult on the TaskChannel. An empty TaskResult suggests that the task finished successfully, and therefore `baseRunner.run` would continue instead of terminate. The motivation of this change is to make sure that cli-utils only terminates on fatal errors (such as inventory-related errors, and ApplyOptions creation errors). A WaitTask timeout may not always mean a fatal error (it may happen because the StatusPoller has not finished polling everything, or some but not all the resources have not reached the desired status), and therefore should not terminate cli-utils.
This commit is contained in:
parent
6f5ee6f2ef
commit
87877d28d1
|
@ -21,6 +21,7 @@ const (
|
|||
StatusType
|
||||
PruneType
|
||||
DeleteType
|
||||
WaitType
|
||||
)
|
||||
|
||||
// Event is the type of the objects that will be returned through
|
||||
|
@ -58,6 +59,9 @@ type Event struct {
|
|||
// DeleteEvent contains information about object that have been
|
||||
// deleted.
|
||||
DeleteEvent DeleteEvent
|
||||
|
||||
// WaitEvent contains information about any errors encountered in a WaitTask.
|
||||
WaitEvent WaitEvent
|
||||
}
|
||||
|
||||
type InitEvent struct {
|
||||
|
@ -85,6 +89,11 @@ type ErrorEvent struct {
|
|||
Err error
|
||||
}
|
||||
|
||||
type WaitEvent struct {
|
||||
GroupName string
|
||||
Error error
|
||||
}
|
||||
|
||||
//go:generate stringer -type=ActionGroupEventType
|
||||
type ActionGroupEventType int
|
||||
|
||||
|
|
|
@ -222,8 +222,9 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
|
|||
}
|
||||
}
|
||||
// A message on the taskChannel means that the current task
|
||||
// has either completed or failed. If it has failed, we return
|
||||
// the error. If the abort flag is true, which means something
|
||||
// has either completed or failed.
|
||||
// If it has failed, we return the error.
|
||||
// If the abort flag is true, which means something
|
||||
// else has gone wrong and we are waiting for the current task to
|
||||
// finish, we exit.
|
||||
// If everything is ok, we fetch and start the next task.
|
||||
|
@ -238,7 +239,6 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
|
|||
},
|
||||
}
|
||||
if msg.Err != nil {
|
||||
b.amendTimeoutError(taskContext, msg.Err)
|
||||
return msg.Err
|
||||
}
|
||||
if abort {
|
||||
|
@ -262,24 +262,6 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
|
|||
}
|
||||
}
|
||||
|
||||
func (b *baseRunner) amendTimeoutError(taskContext *TaskContext, err error) {
|
||||
if timeoutErr, ok := err.(*TimeoutError); ok {
|
||||
var timedOutResources []TimedOutResource
|
||||
for _, id := range timeoutErr.Identifiers {
|
||||
result := taskContext.ResourceCache().Get(id)
|
||||
if timeoutErr.Condition.Meets(result.Status) {
|
||||
continue
|
||||
}
|
||||
timedOutResources = append(timedOutResources, TimedOutResource{
|
||||
Identifier: id,
|
||||
Status: result.Status,
|
||||
Message: result.StatusMessage,
|
||||
})
|
||||
}
|
||||
timeoutErr.TimedOutResources = timedOutResources
|
||||
}
|
||||
}
|
||||
|
||||
// completeIfWaitTask checks if the current task is a wait task. If so,
|
||||
// we invoke the complete function to complete it.
|
||||
func completeIfWaitTask(currentTask Task, taskContext *TaskContext) {
|
||||
|
|
|
@ -45,9 +45,8 @@ func TestBaseRunner(t *testing.T) {
|
|||
statusEventsDelay time.Duration
|
||||
statusEvents []pollevent.Event
|
||||
expectedEventTypes []event.Type
|
||||
expectedError error
|
||||
expectedTimedOutResources []TimedOutResource
|
||||
expectedErrorMsg string
|
||||
expectedTimeoutErrorMsg string
|
||||
}{
|
||||
"wait task runs until condition is met": {
|
||||
tasks: []Task{
|
||||
|
@ -112,9 +111,11 @@ func TestBaseRunner(t *testing.T) {
|
|||
},
|
||||
},
|
||||
expectedEventTypes: []event.Type{
|
||||
event.ActionGroupType,
|
||||
event.StatusType,
|
||||
event.WaitType,
|
||||
event.ActionGroupType,
|
||||
},
|
||||
expectedError: &TimeoutError{},
|
||||
expectedTimedOutResources: []TimedOutResource{
|
||||
{
|
||||
Identifier: depID,
|
||||
|
@ -122,7 +123,7 @@ func TestBaseRunner(t *testing.T) {
|
|||
Message: "resource not cached",
|
||||
},
|
||||
},
|
||||
expectedErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent",
|
||||
expectedTimeoutErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent",
|
||||
},
|
||||
"wait task times out eventually (InProgress)": {
|
||||
tasks: []Task{
|
||||
|
@ -147,16 +148,19 @@ func TestBaseRunner(t *testing.T) {
|
|||
},
|
||||
},
|
||||
expectedEventTypes: []event.Type{
|
||||
event.ActionGroupType,
|
||||
event.StatusType,
|
||||
event.StatusType,
|
||||
event.WaitType,
|
||||
event.ActionGroupType,
|
||||
},
|
||||
expectedError: &TimeoutError{},
|
||||
expectedTimedOutResources: []TimedOutResource{
|
||||
{
|
||||
Identifier: depID,
|
||||
Status: status.InProgressStatus,
|
||||
},
|
||||
},
|
||||
expectedErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent",
|
||||
expectedTimeoutErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent",
|
||||
},
|
||||
"tasks run in order": {
|
||||
tasks: []Task{
|
||||
|
@ -244,18 +248,13 @@ func TestBaseRunner(t *testing.T) {
|
|||
close(eventChannel)
|
||||
wg.Wait()
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.IsType(t, tc.expectedError, err)
|
||||
if timeoutError, ok := err.(*TimeoutError); ok {
|
||||
assert.ElementsMatch(t, tc.expectedTimedOutResources,
|
||||
timeoutError.TimedOutResources)
|
||||
assert.Equal(t, timeoutError.Error(), tc.expectedErrorMsg)
|
||||
}
|
||||
return
|
||||
} else if err != nil {
|
||||
if err != nil {
|
||||
t.Errorf("expected no error, but got %v", err)
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
t.Log(event)
|
||||
}
|
||||
if want, got := len(tc.expectedEventTypes), len(events); want != got {
|
||||
t.Errorf("expected %d events, but got %d", want, got)
|
||||
}
|
||||
|
@ -265,6 +264,14 @@ func TestBaseRunner(t *testing.T) {
|
|||
t.Errorf("expected event type %s, but got %s",
|
||||
want, got)
|
||||
}
|
||||
if e.Type == event.WaitType {
|
||||
err := e.WaitEvent.Error
|
||||
if timeoutError, ok := err.(*TimeoutError); ok {
|
||||
assert.ElementsMatch(t, tc.expectedTimedOutResources,
|
||||
timeoutError.TimedOutResources)
|
||||
assert.Equal(t, timeoutError.Error(), tc.expectedTimeoutErrorMsg)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -102,7 +102,7 @@ func (w *WaitTask) Start(taskContext *TaskContext) {
|
|||
|
||||
// setTimer creates the timer with the timeout value taken from
|
||||
// the WaitTask struct. Once the timer expires, it will send
|
||||
// a message on the TaskChannel provided in the taskContext.
|
||||
// a message on the EventChannel provided in the taskContext.
|
||||
func (w *WaitTask) setTimer(taskContext *TaskContext) {
|
||||
timer := time.NewTimer(w.Timeout)
|
||||
go func() {
|
||||
|
@ -111,16 +111,23 @@ func (w *WaitTask) setTimer(taskContext *TaskContext) {
|
|||
// Timeout is cancelled.
|
||||
<-timer.C
|
||||
select {
|
||||
// We only send the taskResult if no one has gotten
|
||||
// We only send the TimeoutError to the eventChannel if no one has gotten
|
||||
// to the token first.
|
||||
case <-w.token:
|
||||
taskContext.TaskChannel() <- TaskResult{
|
||||
Err: &TimeoutError{
|
||||
Identifiers: w.Ids,
|
||||
Timeout: w.Timeout,
|
||||
Condition: w.Condition,
|
||||
err := &TimeoutError{
|
||||
Identifiers: w.Ids,
|
||||
Timeout: w.Timeout,
|
||||
Condition: w.Condition,
|
||||
}
|
||||
amendTimeoutError(taskContext, err)
|
||||
taskContext.EventChannel() <- event.Event{
|
||||
Type: event.WaitType,
|
||||
WaitEvent: event.WaitEvent{
|
||||
GroupName: w.Name(),
|
||||
Error: err,
|
||||
},
|
||||
}
|
||||
taskContext.TaskChannel() <- TaskResult{}
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
@ -130,6 +137,24 @@ func (w *WaitTask) setTimer(taskContext *TaskContext) {
|
|||
}
|
||||
}
|
||||
|
||||
func amendTimeoutError(taskContext *TaskContext, err error) {
|
||||
if timeoutErr, ok := err.(*TimeoutError); ok {
|
||||
var timedOutResources []TimedOutResource
|
||||
for _, id := range timeoutErr.Identifiers {
|
||||
result := taskContext.ResourceCache().Get(id)
|
||||
if timeoutErr.Condition.Meets(result.Status) {
|
||||
continue
|
||||
}
|
||||
timedOutResources = append(timedOutResources, TimedOutResource{
|
||||
Identifier: id,
|
||||
Status: result.Status,
|
||||
Message: result.StatusMessage,
|
||||
})
|
||||
}
|
||||
timeoutErr.TimedOutResources = timedOutResources
|
||||
}
|
||||
}
|
||||
|
||||
// checkCondition checks whether the condition set in the task
|
||||
// is currently met given the status of resources in the cache.
|
||||
func (w *WaitTask) checkCondition(taskContext *TaskContext) bool {
|
||||
|
|
|
@ -15,7 +15,8 @@ import (
|
|||
)
|
||||
|
||||
func TestWaitTask_TimeoutTriggered(t *testing.T) {
|
||||
task := NewWaitTask("wait", object.ObjMetadataSet{}, AllCurrent,
|
||||
taskName := "wait"
|
||||
task := NewWaitTask(taskName, object.ObjMetadataSet{}, AllCurrent,
|
||||
2*time.Second, testutil.NewFakeRESTMapper())
|
||||
|
||||
eventChannel := make(chan event.Event)
|
||||
|
@ -28,9 +29,16 @@ func TestWaitTask_TimeoutTriggered(t *testing.T) {
|
|||
timer := time.NewTimer(3 * time.Second)
|
||||
|
||||
select {
|
||||
case res := <-taskContext.TaskChannel():
|
||||
if _, ok := IsTimeoutError(res.Err); !ok {
|
||||
t.Errorf("expected timeout error, but got %v", res.Err)
|
||||
case e := <-taskContext.EventChannel():
|
||||
if e.Type != event.WaitType {
|
||||
t.Errorf("expected a WaitType event, but got a %v event", e.Type)
|
||||
}
|
||||
if e.WaitEvent.GroupName != taskName {
|
||||
t.Errorf("expected WaitEvent.GroupName = %q, but got %q", taskName, e.WaitEvent.GroupName)
|
||||
}
|
||||
err := e.WaitEvent.Error
|
||||
if _, ok := IsTimeoutError(err); !ok {
|
||||
t.Errorf("expected timeout error, but got %v", err)
|
||||
}
|
||||
return
|
||||
case <-timer.C:
|
||||
|
|
|
@ -23,6 +23,7 @@ type ExpEvent struct {
|
|||
StatusEvent *ExpStatusEvent
|
||||
PruneEvent *ExpPruneEvent
|
||||
DeleteEvent *ExpDeleteEvent
|
||||
WaitEvent *ExpWaitEvent
|
||||
}
|
||||
|
||||
type ExpInitEvent struct {
|
||||
|
@ -67,6 +68,11 @@ type ExpDeleteEvent struct {
|
|||
Error error
|
||||
}
|
||||
|
||||
type ExpWaitEvent struct {
|
||||
GroupName string
|
||||
Error error
|
||||
}
|
||||
|
||||
func VerifyEvents(expEvents []ExpEvent, events []event.Event) error {
|
||||
if len(expEvents) == 0 && len(events) == 0 {
|
||||
return nil
|
||||
|
@ -234,6 +240,24 @@ func isMatch(ee ExpEvent, e event.Event) bool {
|
|||
return de.Error != nil
|
||||
}
|
||||
return de.Error == nil
|
||||
|
||||
case event.WaitType:
|
||||
wee := ee.WaitEvent
|
||||
if wee == nil {
|
||||
return true
|
||||
}
|
||||
we := e.WaitEvent
|
||||
|
||||
if wee.GroupName != "" {
|
||||
if wee.GroupName != we.GroupName {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if wee.Error != nil {
|
||||
return cmp.Equal(wee.Error, we.Error, cmpopts.EquateErrors())
|
||||
}
|
||||
return we.Error == nil
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
@ -317,6 +341,15 @@ func EventToExpEvent(e event.Event) ExpEvent {
|
|||
Error: e.DeleteEvent.Error,
|
||||
},
|
||||
}
|
||||
|
||||
case event.WaitType:
|
||||
return ExpEvent{
|
||||
EventType: event.WaitType,
|
||||
WaitEvent: &ExpWaitEvent{
|
||||
GroupName: e.WaitEvent.GroupName,
|
||||
Error: e.WaitEvent.Error,
|
||||
},
|
||||
}
|
||||
}
|
||||
return ExpEvent{}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue