cli-utils/pkg/apply/taskrunner/runner.go

230 lines
7.2 KiB
Go

// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package taskrunner
import (
"context"
"fmt"
"time"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/object"
)
// NewTaskStatusRunner returns a new TaskStatusRunner.
func NewTaskStatusRunner(identifiers object.ObjMetadataSet, statusPoller poller.Poller) *TaskStatusRunner {
return &TaskStatusRunner{
Identifiers: identifiers,
StatusPoller: statusPoller,
}
}
// TaskStatusRunner is a taskRunner that executes a set of
// tasks while at the same time uses the statusPoller to
// keep track of the status of the resources.
type TaskStatusRunner struct {
Identifiers object.ObjMetadataSet
StatusPoller poller.Poller
}
// Options defines properties that is passed along to
// the statusPoller.
type Options struct {
PollInterval time.Duration
EmitStatusEvents bool
}
// Run executes the tasks in the taskqueue, with the statusPoller running in the
// background.
//
// The tasks run in a loop where a single goroutine will process events from
// three different channels.
// - taskQueue is read to allow updating the task queue at runtime.
// - statusChannel is read to allow updates to the resource cache and triggering
// validation of wait conditions.
// - eventChannel is written to with events based on status updates, if
// emitStatusEvents is true.
func (tsr *TaskStatusRunner) Run(
ctx context.Context,
taskContext *TaskContext,
taskQueue chan Task,
opts Options,
) error {
// Give the poller its own context and run it in the background.
// If taskStatusRunner.Run is cancelled, baseRunner.run will exit early,
// causing the poller to be cancelled.
statusCtx, cancelFunc := context.WithCancel(context.Background())
statusChannel := tsr.StatusPoller.Poll(statusCtx, tsr.Identifiers, polling.PollOptions{
PollInterval: opts.PollInterval,
})
// complete stops the statusPoller, drains the statusChannel, and returns
// the provided error.
// Run this before returning!
// Avoid using defer, otherwise the statusPoller will hang. It needs to be
// drained synchronously before return, instead of asynchronously after.
complete := func(err error) error {
cancelFunc()
for range statusChannel {
}
return err
}
// Find and start the first task in the queue.
currentTask, done := nextTask(taskQueue, taskContext)
if done {
return complete(nil)
}
// abort is used to signal that something has failed, and
// the task processing should end as soon as is possible. Only
// wait tasks can be interrupted, so for all other tasks we need
// to wait for the currently running one to finish before we can
// exit.
abort := false
var abortReason error
// We do this so we can set the doneCh to a nil channel after
// it has been closed. This is needed to avoid a busy loop.
doneCh := ctx.Done()
for {
select {
// This processes status events from a channel, most likely
// driven by the StatusPoller. All normal resource status update
// events are passed through to the eventChannel. This means
// that listeners of the eventChannel will get updates on status
// even while other tasks (like apply tasks) are running.
case statusEvent, ok := <-statusChannel:
// If the statusChannel has closed or we are preparing
// to abort the task processing, we just ignore all
// statusEvents.
// TODO(mortent): Check if a closed statusChannel might
// create a busy loop here.
if !ok || abort {
continue
}
// An error event on the statusChannel means the StatusPoller
// has encountered a problem so it can't continue. This means
// the statusChannel will be closed soon.
if statusEvent.Type == pollevent.ErrorEvent {
abort = true
abortReason = fmt.Errorf("polling for status failed: %v",
statusEvent.Error)
currentTask.Cancel(taskContext)
continue
}
if opts.EmitStatusEvents {
// Forward all normal events to the eventChannel
taskContext.SendEvent(event.Event{
Type: event.StatusType,
StatusEvent: event.StatusEvent{
Identifier: statusEvent.Resource.Identifier,
PollResourceInfo: statusEvent.Resource,
Resource: statusEvent.Resource.Resource,
Error: statusEvent.Error,
},
})
}
id := statusEvent.Resource.Identifier
// Update the cache to track the latest resource spec & status.
// Status is computed from the resource on-demand.
// Warning: Resource may be nil!
taskContext.ResourceCache().Put(id, cache.ResourceStatus{
Resource: statusEvent.Resource.Resource,
Status: statusEvent.Resource.Status,
StatusMessage: statusEvent.Resource.Message,
})
// send a status update to the running task, but only if the status
// has changed and the task is tracking the object.
if currentTask.Identifiers().Contains(id) {
currentTask.StatusUpdate(taskContext, id)
}
// A message on the taskChannel means that the current task
// has either completed or failed.
// If it has failed, we return the error.
// If the abort flag is true, which means something
// else has gone wrong and we are waiting for the current task to
// finish, we exit.
// If everything is ok, we fetch and start the next task.
case msg := <-taskContext.TaskChannel():
taskContext.SendEvent(event.Event{
Type: event.ActionGroupType,
ActionGroupEvent: event.ActionGroupEvent{
GroupName: currentTask.Name(),
Action: currentTask.Action(),
Type: event.Finished,
},
})
if msg.Err != nil {
return complete(
fmt.Errorf("task failed (action: %q, name: %q): %w",
currentTask.Action(), currentTask.Name(), msg.Err))
}
if abort {
return complete(abortReason)
}
currentTask, done = nextTask(taskQueue, taskContext)
// If there are no more tasks, we are done. So just
// return.
if done {
return complete(nil)
}
// The doneCh will be closed if the passed in context is cancelled.
// If so, we just set the abort flag and wait for the currently running
// task to complete before we exit.
case <-doneCh:
doneCh = nil // Set doneCh to nil so we don't enter a busy loop.
abort = true
abortReason = ctx.Err() // always non-nil when doneCh is closed
currentTask.Cancel(taskContext)
}
}
}
// nextTask fetches the latest task from the taskQueue and
// starts it. If the taskQueue is empty, it the second
// return value will be true.
func nextTask(taskQueue chan Task, taskContext *TaskContext) (Task, bool) {
var tsk Task
select {
// If there is any tasks left in the queue, this
// case statement will be executed.
case t := <-taskQueue:
tsk = t
default:
// Only happens when the channel is empty.
return nil, true
}
taskContext.SendEvent(event.Event{
Type: event.ActionGroupType,
ActionGroupEvent: event.ActionGroupEvent{
GroupName: tsk.Name(),
Action: tsk.Action(),
Type: event.Started,
},
})
tsk.Start(taskContext)
return tsk, false
}
// TaskResult is the type returned from tasks once they have completed
// or failed. If it has failed or timed out, the Err property will be
// set.
type TaskResult struct {
Err error
}