mirror of https://github.com/fluxcd/cli-utils.git
Merge pull request #511 from karlkfi/karl-inject-context
fix: Merge task runner impls into taskStatusRunner
This commit is contained in:
commit
eef250b780
|
|
@ -230,9 +230,10 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
|
||||||
// Create a new TaskStatusRunner to execute the taskQueue.
|
// Create a new TaskStatusRunner to execute the taskQueue.
|
||||||
klog.V(4).Infoln("applier building TaskStatusRunner...")
|
klog.V(4).Infoln("applier building TaskStatusRunner...")
|
||||||
allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
|
allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
|
||||||
runner := taskrunner.NewTaskStatusRunner(allIds, a.StatusPoller, resourceCache)
|
runner := taskrunner.NewTaskStatusRunner(allIds, a.StatusPoller)
|
||||||
|
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
|
||||||
klog.V(4).Infoln("applier running TaskStatusRunner...")
|
klog.V(4).Infoln("applier running TaskStatusRunner...")
|
||||||
err = runner.Run(ctx, taskQueue.ToChannel(), eventChannel, taskrunner.Options{
|
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
|
||||||
PollInterval: options.PollInterval,
|
PollInterval: options.PollInterval,
|
||||||
UseCache: true,
|
UseCache: true,
|
||||||
EmitStatusEvents: options.EmitStatusEvents,
|
EmitStatusEvents: options.EmitStatusEvents,
|
||||||
|
|
|
||||||
|
|
@ -157,10 +157,10 @@ func (d *Destroyer) Run(ctx context.Context, inv inventory.InventoryInfo, option
|
||||||
klog.V(4).Infoln("destroyer building TaskStatusRunner...")
|
klog.V(4).Infoln("destroyer building TaskStatusRunner...")
|
||||||
deleteIds := object.UnstructuredSetToObjMetadataSet(deleteObjs)
|
deleteIds := object.UnstructuredSetToObjMetadataSet(deleteObjs)
|
||||||
resourceCache := cache.NewResourceCacheMap()
|
resourceCache := cache.NewResourceCacheMap()
|
||||||
runner := taskrunner.NewTaskStatusRunner(deleteIds, d.StatusPoller, resourceCache)
|
runner := taskrunner.NewTaskStatusRunner(deleteIds, d.StatusPoller)
|
||||||
|
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
|
||||||
klog.V(4).Infoln("destroyer running TaskStatusRunner...")
|
klog.V(4).Infoln("destroyer running TaskStatusRunner...")
|
||||||
// TODO(seans): Make the poll interval configurable like the applier.
|
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
|
||||||
err = runner.Run(ctx, taskQueue.ToChannel(), eventChannel, taskrunner.Options{
|
|
||||||
UseCache: true,
|
UseCache: true,
|
||||||
PollInterval: options.PollInterval,
|
PollInterval: options.PollInterval,
|
||||||
EmitStatusEvents: options.EmitStatusEvents,
|
EmitStatusEvents: options.EmitStatusEvents,
|
||||||
|
|
|
||||||
|
|
@ -102,15 +102,15 @@ func (ie InitEvent) String() string {
|
||||||
return fmt.Sprintf("InitEvent{ ActionGroups: %s }", ie.ActionGroups)
|
return fmt.Sprintf("InitEvent{ ActionGroups: %s }", ie.ActionGroups)
|
||||||
}
|
}
|
||||||
|
|
||||||
//go:generate stringer -type=ResourceAction
|
//go:generate stringer -type=ResourceAction -linecomment
|
||||||
type ResourceAction int
|
type ResourceAction int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ApplyAction ResourceAction = iota
|
ApplyAction ResourceAction = iota // Apply
|
||||||
PruneAction
|
PruneAction // Prune
|
||||||
DeleteAction
|
DeleteAction // Delete
|
||||||
WaitAction
|
WaitAction // Wait
|
||||||
InventoryAction
|
InventoryAction // Inventory
|
||||||
)
|
)
|
||||||
|
|
||||||
type ActionGroupList []ActionGroup
|
type ActionGroupList []ActionGroup
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
// Code generated by "stringer -type=ResourceAction"; DO NOT EDIT.
|
// Code generated by "stringer -type=ResourceAction -linecomment"; DO NOT EDIT.
|
||||||
|
|
||||||
package event
|
package event
|
||||||
|
|
||||||
|
|
@ -15,9 +15,9 @@ func _() {
|
||||||
_ = x[InventoryAction-4]
|
_ = x[InventoryAction-4]
|
||||||
}
|
}
|
||||||
|
|
||||||
const _ResourceAction_name = "ApplyActionPruneActionDeleteActionWaitActionInventoryAction"
|
const _ResourceAction_name = "ApplyPruneDeleteWaitInventory"
|
||||||
|
|
||||||
var _ResourceAction_index = [...]uint8{0, 11, 22, 34, 44, 59}
|
var _ResourceAction_index = [...]uint8{0, 5, 10, 16, 20, 29}
|
||||||
|
|
||||||
func (i ResourceAction) String() string {
|
func (i ResourceAction) String() string {
|
||||||
if i < 0 || i >= ResourceAction(len(_ResourceAction_index)-1) {
|
if i < 0 || i >= ResourceAction(len(_ResourceAction_index)-1) {
|
||||||
|
|
|
||||||
|
|
@ -17,11 +17,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewTaskStatusRunner returns a new TaskStatusRunner.
|
// NewTaskStatusRunner returns a new TaskStatusRunner.
|
||||||
func NewTaskStatusRunner(identifiers object.ObjMetadataSet, statusPoller poller.Poller, cache cache.ResourceCache) *taskStatusRunner {
|
func NewTaskStatusRunner(identifiers object.ObjMetadataSet, statusPoller poller.Poller) *taskStatusRunner {
|
||||||
return &taskStatusRunner{
|
return &taskStatusRunner{
|
||||||
identifiers: identifiers,
|
identifiers: identifiers,
|
||||||
statusPoller: statusPoller,
|
statusPoller: statusPoller,
|
||||||
baseRunner: newBaseRunner(cache),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -31,8 +30,6 @@ func NewTaskStatusRunner(identifiers object.ObjMetadataSet, statusPoller poller.
|
||||||
type taskStatusRunner struct {
|
type taskStatusRunner struct {
|
||||||
identifiers object.ObjMetadataSet
|
identifiers object.ObjMetadataSet
|
||||||
statusPoller poller.Poller
|
statusPoller poller.Poller
|
||||||
|
|
||||||
baseRunner *baseRunner
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Options defines properties that is passed along to
|
// Options defines properties that is passed along to
|
||||||
|
|
@ -43,86 +40,8 @@ type Options struct {
|
||||||
EmitStatusEvents bool
|
EmitStatusEvents bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the execution of the taskqueue. It will start the
|
// Run executes the tasks in the taskqueue, with the statusPoller running in the
|
||||||
// statusPoller and then pass the statusChannel to the baseRunner
|
// background.
|
||||||
// that does most of the work.
|
|
||||||
func (tsr *taskStatusRunner) Run(ctx context.Context, taskQueue chan Task,
|
|
||||||
eventChannel chan event.Event, options Options) error {
|
|
||||||
// Give the poller its own context and run it in the background.
|
|
||||||
// If taskStatusRunner.Run is cancelled, baseRunner.run will exit early,
|
|
||||||
// causing the poller to be cancelled.
|
|
||||||
statusCtx, cancelFunc := context.WithCancel(context.Background())
|
|
||||||
statusChannel := tsr.statusPoller.Poll(statusCtx, tsr.identifiers, polling.Options{
|
|
||||||
PollInterval: options.PollInterval,
|
|
||||||
UseCache: options.UseCache,
|
|
||||||
})
|
|
||||||
|
|
||||||
o := baseOptions{
|
|
||||||
emitStatusEvents: options.EmitStatusEvents,
|
|
||||||
}
|
|
||||||
err := tsr.baseRunner.run(ctx, taskQueue, statusChannel, eventChannel, o)
|
|
||||||
// cancel the statusPoller by cancelling the context.
|
|
||||||
cancelFunc()
|
|
||||||
// drain the statusChannel to make sure the lack of a consumer
|
|
||||||
// doesn't block the shutdown of the statusPoller.
|
|
||||||
for range statusChannel {
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewTaskRunner returns a new taskRunner. It can process taskqueues
|
|
||||||
// that does not contain any wait tasks.
|
|
||||||
// TODO: Do we need this abstraction layer now that baseRunner doesn't need a collector?
|
|
||||||
func NewTaskRunner() *taskRunner {
|
|
||||||
return &taskRunner{
|
|
||||||
baseRunner: newBaseRunner(cache.NewResourceCacheMap()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// taskRunner is a simplified taskRunner that does not support
|
|
||||||
// wait tasks and does not provide any status updates for the
|
|
||||||
// resources. This is useful in situations where we are not interested
|
|
||||||
// in status, for example during dry-run.
|
|
||||||
type taskRunner struct {
|
|
||||||
baseRunner *baseRunner
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run starts the execution of the task queue. It delegates the
|
|
||||||
// work to the baseRunner, but gives it as nil channel as the statusChannel.
|
|
||||||
func (tr *taskRunner) Run(ctx context.Context, taskQueue chan Task,
|
|
||||||
eventChannel chan event.Event) error {
|
|
||||||
var nilStatusChannel chan pollevent.Event
|
|
||||||
o := baseOptions{
|
|
||||||
// The taskRunner doesn't poll for status, so there are not
|
|
||||||
// statusEvents to emit.
|
|
||||||
emitStatusEvents: false,
|
|
||||||
}
|
|
||||||
return tr.baseRunner.run(ctx, taskQueue, nilStatusChannel, eventChannel, o)
|
|
||||||
}
|
|
||||||
|
|
||||||
// newBaseRunner returns a new baseRunner using the provided cache.
|
|
||||||
func newBaseRunner(cache cache.ResourceCache) *baseRunner {
|
|
||||||
return &baseRunner{
|
|
||||||
cache: cache,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// baseRunner provides the basic task runner functionality.
|
|
||||||
//
|
|
||||||
// The cache can be used by tasks to retrieve the last known resource state.
|
|
||||||
//
|
|
||||||
// This is not meant to be used directly. It is used by the taskRunner and
|
|
||||||
// taskStatusRunner.
|
|
||||||
type baseRunner struct {
|
|
||||||
cache cache.ResourceCache
|
|
||||||
}
|
|
||||||
|
|
||||||
type baseOptions struct {
|
|
||||||
// emitStatusEvents enables emitting events on the eventChannel
|
|
||||||
emitStatusEvents bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// run executes the tasks in the taskqueue.
|
|
||||||
//
|
//
|
||||||
// The tasks run in a loop where a single goroutine will process events from
|
// The tasks run in a loop where a single goroutine will process events from
|
||||||
// three different channels.
|
// three different channels.
|
||||||
|
|
@ -131,18 +50,37 @@ type baseOptions struct {
|
||||||
// validation of wait conditions.
|
// validation of wait conditions.
|
||||||
// - eventChannel is written to with events based on status updates, if
|
// - eventChannel is written to with events based on status updates, if
|
||||||
// emitStatusEvents is true.
|
// emitStatusEvents is true.
|
||||||
func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
|
func (tsr *taskStatusRunner) Run(
|
||||||
statusChannel <-chan pollevent.Event, eventChannel chan event.Event,
|
ctx context.Context,
|
||||||
o baseOptions) error {
|
taskContext *TaskContext,
|
||||||
// taskContext is passed into all tasks when they are started. It
|
taskQueue chan Task,
|
||||||
// provides access to the eventChannel and the taskChannel, and
|
opts Options,
|
||||||
// also provides a way to pass data between tasks.
|
) error {
|
||||||
taskContext := NewTaskContext(eventChannel, b.cache)
|
// Give the poller its own context and run it in the background.
|
||||||
|
// If taskStatusRunner.Run is cancelled, baseRunner.run will exit early,
|
||||||
|
// causing the poller to be cancelled.
|
||||||
|
statusCtx, cancelFunc := context.WithCancel(context.Background())
|
||||||
|
statusChannel := tsr.statusPoller.Poll(statusCtx, tsr.identifiers, polling.Options{
|
||||||
|
PollInterval: opts.PollInterval,
|
||||||
|
UseCache: opts.UseCache,
|
||||||
|
})
|
||||||
|
|
||||||
|
// complete stops the statusPoller, drains the statusChannel, and returns
|
||||||
|
// the provided error.
|
||||||
|
// Run this before returning!
|
||||||
|
// Avoid using defer, otherwise the statusPoller will hang. It needs to be
|
||||||
|
// drained synchronously before return, instead of asynchronously after.
|
||||||
|
complete := func(err error) error {
|
||||||
|
cancelFunc()
|
||||||
|
for range statusChannel {
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Find and start the first task in the queue.
|
// Find and start the first task in the queue.
|
||||||
currentTask, done := b.nextTask(taskQueue, taskContext)
|
currentTask, done := nextTask(taskQueue, taskContext)
|
||||||
if done {
|
if done {
|
||||||
return nil
|
return complete(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// abort is used to signal that something has failed, and
|
// abort is used to signal that something has failed, and
|
||||||
|
|
@ -185,7 +123,7 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if o.emitStatusEvents {
|
if opts.EmitStatusEvents {
|
||||||
// Forward all normal events to the eventChannel
|
// Forward all normal events to the eventChannel
|
||||||
taskContext.SendEvent(event.Event{
|
taskContext.SendEvent(event.Event{
|
||||||
Type: event.StatusType,
|
Type: event.StatusType,
|
||||||
|
|
@ -231,16 +169,18 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if msg.Err != nil {
|
if msg.Err != nil {
|
||||||
return msg.Err
|
return complete(
|
||||||
|
fmt.Errorf("task failed (action: %q, name: %q): %w",
|
||||||
|
currentTask.Action(), currentTask.Name(), msg.Err))
|
||||||
}
|
}
|
||||||
if abort {
|
if abort {
|
||||||
return abortReason
|
return complete(abortReason)
|
||||||
}
|
}
|
||||||
currentTask, done = b.nextTask(taskQueue, taskContext)
|
currentTask, done = nextTask(taskQueue, taskContext)
|
||||||
// If there are no more tasks, we are done. So just
|
// If there are no more tasks, we are done. So just
|
||||||
// return.
|
// return.
|
||||||
if done {
|
if done {
|
||||||
return nil
|
return complete(nil)
|
||||||
}
|
}
|
||||||
// The doneCh will be closed if the passed in context is cancelled.
|
// The doneCh will be closed if the passed in context is cancelled.
|
||||||
// If so, we just set the abort flag and wait for the currently running
|
// If so, we just set the abort flag and wait for the currently running
|
||||||
|
|
@ -257,8 +197,7 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
|
||||||
// nextTask fetches the latest task from the taskQueue and
|
// nextTask fetches the latest task from the taskQueue and
|
||||||
// starts it. If the taskQueue is empty, it the second
|
// starts it. If the taskQueue is empty, it the second
|
||||||
// return value will be true.
|
// return value will be true.
|
||||||
func (b *baseRunner) nextTask(taskQueue chan Task,
|
func nextTask(taskQueue chan Task, taskContext *TaskContext) (Task, bool) {
|
||||||
taskContext *TaskContext) (Task, bool) {
|
|
||||||
var tsk Task
|
var tsk Task
|
||||||
select {
|
select {
|
||||||
// If there is any tasks left in the queue, this
|
// If there is any tasks left in the queue, this
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"sigs.k8s.io/cli-utils/pkg/apply/cache"
|
"sigs.k8s.io/cli-utils/pkg/apply/cache"
|
||||||
"sigs.k8s.io/cli-utils/pkg/apply/event"
|
"sigs.k8s.io/cli-utils/pkg/apply/event"
|
||||||
|
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
|
||||||
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
|
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
|
||||||
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
|
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
|
||||||
"sigs.k8s.io/cli-utils/pkg/object"
|
"sigs.k8s.io/cli-utils/pkg/object"
|
||||||
|
|
@ -270,13 +271,18 @@ func TestBaseRunner(t *testing.T) {
|
||||||
|
|
||||||
for tn, tc := range testCases {
|
for tn, tc := range testCases {
|
||||||
t.Run(tn, func(t *testing.T) {
|
t.Run(tn, func(t *testing.T) {
|
||||||
runner := newBaseRunner(cache.NewResourceCacheMap())
|
|
||||||
eventChannel := make(chan event.Event)
|
|
||||||
taskQueue := make(chan Task, len(tc.tasks))
|
taskQueue := make(chan Task, len(tc.tasks))
|
||||||
for _, tsk := range tc.tasks {
|
for _, tsk := range tc.tasks {
|
||||||
taskQueue <- tsk
|
taskQueue <- tsk
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ids := object.ObjMetadataSet{} // unused by fake poller
|
||||||
|
poller := newFakePoller(tc.statusEvents)
|
||||||
|
eventChannel := make(chan event.Event)
|
||||||
|
resourceCache := cache.NewResourceCacheMap()
|
||||||
|
taskContext := NewTaskContext(eventChannel, resourceCache)
|
||||||
|
runner := NewTaskStatusRunner(ids, poller)
|
||||||
|
|
||||||
// Use a WaitGroup to make sure changes in the goroutines
|
// Use a WaitGroup to make sure changes in the goroutines
|
||||||
// are visible to the main goroutine.
|
// are visible to the main goroutine.
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
@ -286,10 +292,8 @@ func TestBaseRunner(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
<-time.NewTimer(tc.statusEventsDelay).C
|
time.Sleep(tc.statusEventsDelay)
|
||||||
for _, se := range tc.statusEvents {
|
poller.Start()
|
||||||
statusChannel <- se
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var events []event.Event
|
var events []event.Event
|
||||||
|
|
@ -302,15 +306,14 @@ func TestBaseRunner(t *testing.T) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err := runner.run(context.Background(), taskQueue, statusChannel,
|
opts := Options{EmitStatusEvents: true}
|
||||||
eventChannel, baseOptions{emitStatusEvents: true})
|
ctx := context.Background()
|
||||||
|
err := runner.Run(ctx, taskContext, taskQueue, opts)
|
||||||
close(statusChannel)
|
close(statusChannel)
|
||||||
close(eventChannel)
|
close(eventChannel)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
if err != nil {
|
assert.NoError(t, err)
|
||||||
t.Errorf("expected no error, but got %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if want, got := len(tc.expectedEventTypes), len(events); want != got {
|
if want, got := len(tc.expectedEventTypes), len(events); want != got {
|
||||||
t.Errorf("expected %d events, but got %d", want, got)
|
t.Errorf("expected %d events, but got %d", want, got)
|
||||||
|
|
@ -358,7 +361,7 @@ func TestBaseRunnerCancellation(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
contextTimeout: 2 * time.Second,
|
contextTimeout: 2 * time.Second,
|
||||||
expectedError: context.Canceled,
|
expectedError: context.DeadlineExceeded,
|
||||||
expectedEventTypes: []event.Type{
|
expectedEventTypes: []event.Type{
|
||||||
event.ActionGroupType,
|
event.ActionGroupType,
|
||||||
event.ApplyType,
|
event.ApplyType,
|
||||||
|
|
@ -377,7 +380,7 @@ func TestBaseRunnerCancellation(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
contextTimeout: 2 * time.Second,
|
contextTimeout: 2 * time.Second,
|
||||||
expectedError: context.Canceled,
|
expectedError: context.DeadlineExceeded,
|
||||||
expectedEventTypes: []event.Type{
|
expectedEventTypes: []event.Type{
|
||||||
event.ActionGroupType,
|
event.ActionGroupType,
|
||||||
event.WaitType, // pending
|
event.WaitType, // pending
|
||||||
|
|
@ -387,6 +390,7 @@ func TestBaseRunnerCancellation(t *testing.T) {
|
||||||
"error while custom task is running": {
|
"error while custom task is running": {
|
||||||
tasks: []Task{
|
tasks: []Task{
|
||||||
&fakeApplyTask{
|
&fakeApplyTask{
|
||||||
|
name: "apply-0",
|
||||||
resultEvent: event.Event{
|
resultEvent: event.Event{
|
||||||
Type: event.ApplyType,
|
Type: event.ApplyType,
|
||||||
},
|
},
|
||||||
|
|
@ -394,6 +398,7 @@ func TestBaseRunnerCancellation(t *testing.T) {
|
||||||
err: testError,
|
err: testError,
|
||||||
},
|
},
|
||||||
&fakeApplyTask{
|
&fakeApplyTask{
|
||||||
|
name: "prune-0",
|
||||||
resultEvent: event.Event{
|
resultEvent: event.Event{
|
||||||
Type: event.PruneType,
|
Type: event.PruneType,
|
||||||
},
|
},
|
||||||
|
|
@ -401,7 +406,7 @@ func TestBaseRunnerCancellation(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
contextTimeout: 30 * time.Second,
|
contextTimeout: 30 * time.Second,
|
||||||
expectedError: testError,
|
expectedError: fmt.Errorf(`task failed (action: "Apply", name: "apply-0"): %w`, testError),
|
||||||
expectedEventTypes: []event.Type{
|
expectedEventTypes: []event.Type{
|
||||||
event.ActionGroupType,
|
event.ActionGroupType,
|
||||||
event.ApplyType,
|
event.ApplyType,
|
||||||
|
|
@ -427,7 +432,7 @@ func TestBaseRunnerCancellation(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
contextTimeout: 30 * time.Second,
|
contextTimeout: 30 * time.Second,
|
||||||
expectedError: testError,
|
expectedError: fmt.Errorf("polling for status failed: %w", testError),
|
||||||
expectedEventTypes: []event.Type{
|
expectedEventTypes: []event.Type{
|
||||||
event.ActionGroupType,
|
event.ActionGroupType,
|
||||||
event.WaitType, // pending
|
event.WaitType, // pending
|
||||||
|
|
@ -438,14 +443,18 @@ func TestBaseRunnerCancellation(t *testing.T) {
|
||||||
|
|
||||||
for tn, tc := range testCases {
|
for tn, tc := range testCases {
|
||||||
t.Run(tn, func(t *testing.T) {
|
t.Run(tn, func(t *testing.T) {
|
||||||
runner := newBaseRunner(cache.NewResourceCacheMap())
|
|
||||||
eventChannel := make(chan event.Event)
|
|
||||||
|
|
||||||
taskQueue := make(chan Task, len(tc.tasks))
|
taskQueue := make(chan Task, len(tc.tasks))
|
||||||
for _, tsk := range tc.tasks {
|
for _, tsk := range tc.tasks {
|
||||||
taskQueue <- tsk
|
taskQueue <- tsk
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ids := object.ObjMetadataSet{} // unused by fake poller
|
||||||
|
poller := newFakePoller(tc.statusEvents)
|
||||||
|
eventChannel := make(chan event.Event)
|
||||||
|
resourceCache := cache.NewResourceCacheMap()
|
||||||
|
taskContext := NewTaskContext(eventChannel, resourceCache)
|
||||||
|
runner := NewTaskStatusRunner(ids, poller)
|
||||||
|
|
||||||
// Use a WaitGroup to make sure changes in the goroutines
|
// Use a WaitGroup to make sure changes in the goroutines
|
||||||
// are visible to the main goroutine.
|
// are visible to the main goroutine.
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
@ -455,10 +464,8 @@ func TestBaseRunnerCancellation(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
<-time.NewTimer(tc.statusEventsDelay).C
|
time.Sleep(tc.statusEventsDelay)
|
||||||
for _, se := range tc.statusEvents {
|
poller.Start()
|
||||||
statusChannel <- se
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var events []event.Event
|
var events []event.Event
|
||||||
|
|
@ -473,18 +480,17 @@ func TestBaseRunnerCancellation(t *testing.T) {
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), tc.contextTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), tc.contextTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
err := runner.run(ctx, taskQueue, statusChannel, eventChannel,
|
|
||||||
baseOptions{emitStatusEvents: false})
|
opts := Options{EmitStatusEvents: true}
|
||||||
|
err := runner.Run(ctx, taskContext, taskQueue, opts)
|
||||||
close(statusChannel)
|
close(statusChannel)
|
||||||
close(eventChannel)
|
close(eventChannel)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
if tc.expectedError == nil && err != nil {
|
if tc.expectedError != nil {
|
||||||
t.Errorf("expected no error, but got %v", err)
|
assert.EqualError(t, err, tc.expectedError.Error())
|
||||||
}
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
if tc.expectedError != nil && err == nil {
|
|
||||||
t.Errorf("expected error %v, but didn't get one", tc.expectedError)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if want, got := len(tc.expectedEventTypes), len(events); want != got {
|
if want, got := len(tc.expectedEventTypes), len(events); want != got {
|
||||||
|
|
@ -533,3 +539,35 @@ func (f *fakeApplyTask) Start(taskContext *TaskContext) {
|
||||||
func (f *fakeApplyTask) Cancel(_ *TaskContext) {}
|
func (f *fakeApplyTask) Cancel(_ *TaskContext) {}
|
||||||
|
|
||||||
func (f *fakeApplyTask) StatusUpdate(_ *TaskContext, _ object.ObjMetadata) {}
|
func (f *fakeApplyTask) StatusUpdate(_ *TaskContext, _ object.ObjMetadata) {}
|
||||||
|
|
||||||
|
type fakePoller struct {
|
||||||
|
start chan struct{}
|
||||||
|
events []pollevent.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFakePoller(statusEvents []pollevent.Event) *fakePoller {
|
||||||
|
return &fakePoller{
|
||||||
|
events: statusEvents,
|
||||||
|
start: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start events being sent on the status channel
|
||||||
|
func (f *fakePoller) Start() {
|
||||||
|
close(f.start)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakePoller) Poll(ctx context.Context, _ object.ObjMetadataSet, _ polling.Options) <-chan pollevent.Event {
|
||||||
|
eventChannel := make(chan pollevent.Event)
|
||||||
|
go func() {
|
||||||
|
defer close(eventChannel)
|
||||||
|
// wait until started to send the events
|
||||||
|
<-f.start
|
||||||
|
for _, f := range f.events {
|
||||||
|
eventChannel <- f
|
||||||
|
}
|
||||||
|
// wait until cancelled to close the event channel and exit
|
||||||
|
<-ctx.Done()
|
||||||
|
}()
|
||||||
|
return eventChannel
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue