mirror of https://github.com/fluxcd/cli-utils.git
Add global destroyer timeout
- Destroyer context added, with cancel test. - Fix Applier cancel test not using specified timeouts. - Added a few logs to help with debugging test failures
This commit is contained in:
parent
8b12ecd594
commit
31679a650f
|
|
@ -4,6 +4,7 @@
|
|||
package destroy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
|
@ -117,7 +118,7 @@ func (r *DestroyRunner) RunE(cmd *cobra.Command, args []string) error {
|
|||
// Run the destroyer. It will return a channel where we can receive updates
|
||||
// to keep track of progress and any issues.
|
||||
printStatusEvents := r.deleteTimeout != time.Duration(0)
|
||||
ch := d.Run(inv, apply.DestroyerOptions{
|
||||
ch := d.Run(context.Background(), inv, apply.DestroyerOptions{
|
||||
DeleteTimeout: r.deleteTimeout,
|
||||
DeletePropagationPolicy: deletePropPolicy,
|
||||
InventoryPolicy: inventoryPolicy,
|
||||
|
|
|
|||
|
|
@ -139,12 +139,9 @@ func (r *PreviewRunner) RunE(cmd *cobra.Command, args []string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Create a context
|
||||
ctx := context.Background()
|
||||
|
||||
// Run the applier. It will return a channel where we can receive updates
|
||||
// to keep track of progress and any issues.
|
||||
ch = a.Run(ctx, inv, objs, apply.Options{
|
||||
ch = a.Run(context.Background(), inv, objs, apply.Options{
|
||||
EmitStatusEvents: false,
|
||||
NoPrune: noPrune,
|
||||
DryRunStrategy: drs,
|
||||
|
|
@ -156,7 +153,7 @@ func (r *PreviewRunner) RunE(cmd *cobra.Command, args []string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ch = d.Run(inv, apply.DestroyerOptions{
|
||||
ch = d.Run(context.Background(), inv, apply.DestroyerOptions{
|
||||
InventoryPolicy: inventoryPolicy,
|
||||
DryRunStrategy: drs,
|
||||
})
|
||||
|
|
|
|||
|
|
@ -584,8 +584,12 @@ func TestApplierCancel(t *testing.T) {
|
|||
},
|
||||
clusterObjs: object.UnstructuredSet{},
|
||||
options: Options{
|
||||
NoPrune: true,
|
||||
InventoryPolicy: inventory.InventoryPolicyMustMatch,
|
||||
// EmitStatusEvents required to test event output
|
||||
EmitStatusEvents: true,
|
||||
NoPrune: true,
|
||||
InventoryPolicy: inventory.InventoryPolicyMustMatch,
|
||||
// ReconcileTimeout required to enable WaitTasks
|
||||
ReconcileTimeout: 1 * time.Minute,
|
||||
},
|
||||
statusEvents: []pollevent.Event{
|
||||
{
|
||||
|
|
@ -721,8 +725,12 @@ func TestApplierCancel(t *testing.T) {
|
|||
},
|
||||
clusterObjs: object.UnstructuredSet{},
|
||||
options: Options{
|
||||
NoPrune: true,
|
||||
InventoryPolicy: inventory.InventoryPolicyMustMatch,
|
||||
// EmitStatusEvents required to test event output
|
||||
EmitStatusEvents: true,
|
||||
NoPrune: true,
|
||||
InventoryPolicy: inventory.InventoryPolicyMustMatch,
|
||||
// ReconcileTimeout required to enable WaitTasks
|
||||
ReconcileTimeout: 1 * time.Minute,
|
||||
},
|
||||
statusEvents: []pollevent.Event{
|
||||
{
|
||||
|
|
@ -853,9 +861,6 @@ func TestApplierCancel(t *testing.T) {
|
|||
|
||||
for tn, tc := range testCases {
|
||||
t.Run(tn, func(t *testing.T) {
|
||||
expectedTimeout := 2 * time.Second
|
||||
testTimeout := expectedTimeout + 10*time.Second
|
||||
|
||||
poller := newFakePoller(tc.statusEvents)
|
||||
|
||||
applier := newTestApplier(t,
|
||||
|
|
@ -866,19 +871,14 @@ func TestApplierCancel(t *testing.T) {
|
|||
)
|
||||
|
||||
// Context for Applier.Run
|
||||
runCtx, runCancel, isTimedOut := withTimeout(context.Background(), expectedTimeout)
|
||||
runCtx, runCancel := context.WithTimeout(context.Background(), tc.runTimeout)
|
||||
defer runCancel() // cleanup
|
||||
|
||||
// Context for this test (in case Applier.Run never closes the event channel)
|
||||
testCtx, testCancel := context.WithTimeout(context.Background(), testTimeout)
|
||||
testCtx, testCancel := context.WithTimeout(context.Background(), tc.testTimeout)
|
||||
defer testCancel() // cleanup
|
||||
|
||||
eventChannel := applier.Run(runCtx, tc.invInfo.toWrapped(), tc.resources, Options{
|
||||
EmitStatusEvents: true,
|
||||
// ReconcileTimeout needs to block long enough to cancel the run,
|
||||
// otherwise the WaitTask is skipped.
|
||||
ReconcileTimeout: 1 * time.Minute,
|
||||
})
|
||||
eventChannel := applier.Run(runCtx, tc.invInfo.toWrapped(), tc.resources, tc.options)
|
||||
|
||||
// Start sending status events
|
||||
poller.Start()
|
||||
|
|
@ -891,8 +891,8 @@ func TestApplierCancel(t *testing.T) {
|
|||
case <-testCtx.Done():
|
||||
// Test timed out
|
||||
runCancel()
|
||||
t.Fatalf("Applier.Run failed to respond to cancellation (expected: %s, timeout: %s)", expectedTimeout, testTimeout)
|
||||
return
|
||||
t.Errorf("Applier.Run failed to respond to cancellation (expected: %s, timeout: %s)", tc.runTimeout, tc.testTimeout)
|
||||
break loop
|
||||
|
||||
case e, ok := <-eventChannel:
|
||||
if !ok {
|
||||
|
|
@ -922,9 +922,9 @@ func TestApplierCancel(t *testing.T) {
|
|||
// Validate that the expected timeout was the cause of the run completion.
|
||||
// just in case something else cancelled the run
|
||||
if tc.expectRunTimeout {
|
||||
assert.True(t, isTimedOut.Get(), "Applier.Run exited, but not by expected timeout")
|
||||
assert.Equal(t, context.DeadlineExceeded, runCtx.Err(), "Applier.Run exited, but not by expected timeout")
|
||||
} else {
|
||||
assert.False(t, isTimedOut.Get(), "Applier.Run exited, but was unexpectedly cancelled by timeout")
|
||||
assert.Nil(t, runCtx.Err(), "Applier.Run exited, but not by expected timeout")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,9 +10,7 @@ import (
|
|||
"io/ioutil"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
|
@ -28,6 +26,7 @@ import (
|
|||
clienttesting "k8s.io/client-go/testing"
|
||||
"k8s.io/klog/v2"
|
||||
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
|
||||
"sigs.k8s.io/cli-utils/pkg/apply/info"
|
||||
"sigs.k8s.io/cli-utils/pkg/apply/poller"
|
||||
"sigs.k8s.io/cli-utils/pkg/common"
|
||||
"sigs.k8s.io/cli-utils/pkg/inventory"
|
||||
|
|
@ -63,18 +62,79 @@ func (i inventoryInfo) toWrapped() inventory.InventoryInfo {
|
|||
func newTestApplier(
|
||||
t *testing.T,
|
||||
invInfo inventoryInfo,
|
||||
resourceSet object.UnstructuredSet,
|
||||
resources object.UnstructuredSet,
|
||||
clusterObjs object.UnstructuredSet,
|
||||
statusPoller poller.Poller,
|
||||
) *Applier {
|
||||
tf := cmdtesting.NewTestFactory().WithNamespace(invInfo.namespace)
|
||||
tf := newTestFactory(t, invInfo, resources, clusterObjs)
|
||||
defer tf.Cleanup()
|
||||
|
||||
mapper, err := tf.ToRESTMapper()
|
||||
if !assert.NoError(t, err) {
|
||||
t.FailNow()
|
||||
infoHelper := &fakeInfoHelper{
|
||||
factory: tf,
|
||||
}
|
||||
|
||||
invClient := newTestInventory(t, tf, infoHelper)
|
||||
|
||||
applier, err := NewApplier(tf, invClient, statusPoller)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Inject the fakeInfoHelper to allow generating Info
|
||||
// objects that use the FakeRESTClient as the UnstructuredClient.
|
||||
applier.infoHelper = infoHelper
|
||||
|
||||
return applier
|
||||
}
|
||||
|
||||
func newTestDestroyer(
|
||||
t *testing.T,
|
||||
invInfo inventoryInfo,
|
||||
clusterObjs object.UnstructuredSet,
|
||||
statusPoller poller.Poller,
|
||||
) *Destroyer {
|
||||
tf := newTestFactory(t, invInfo, object.UnstructuredSet{}, clusterObjs)
|
||||
defer tf.Cleanup()
|
||||
|
||||
infoHelper := &fakeInfoHelper{
|
||||
factory: tf,
|
||||
}
|
||||
|
||||
invClient := newTestInventory(t, tf, infoHelper)
|
||||
|
||||
destroyer, err := NewDestroyer(tf, invClient, statusPoller)
|
||||
require.NoError(t, err)
|
||||
|
||||
return destroyer
|
||||
}
|
||||
|
||||
func newTestInventory(
|
||||
t *testing.T,
|
||||
tf *cmdtesting.TestFactory,
|
||||
infoHelper info.InfoHelper,
|
||||
) inventory.InventoryClient {
|
||||
// Use an InventoryClient with a fakeInfoHelper to allow generating Info
|
||||
// objects that use the FakeRESTClient as the UnstructuredClient.
|
||||
invClient, err := inventory.ClusterInventoryClientFactory{}.NewInventoryClient(tf)
|
||||
require.NoError(t, err)
|
||||
|
||||
// TODO(mortent): This is not great, but at least this keeps the
|
||||
// ugliness in the test code until we can find a way to wire it
|
||||
// up so to avoid it.
|
||||
invClient.(*inventory.ClusterInventoryClient).InfoHelper = infoHelper
|
||||
|
||||
return invClient
|
||||
}
|
||||
|
||||
func newTestFactory(
|
||||
t *testing.T,
|
||||
invInfo inventoryInfo,
|
||||
resourceSet object.UnstructuredSet,
|
||||
clusterObjs object.UnstructuredSet,
|
||||
) *cmdtesting.TestFactory {
|
||||
tf := cmdtesting.NewTestFactory().WithNamespace(invInfo.namespace)
|
||||
|
||||
mapper, err := tf.ToRESTMapper()
|
||||
require.NoError(t, err)
|
||||
|
||||
objMap := make(map[object.ObjMetadata]resourceInfo)
|
||||
for _, r := range resourceSet {
|
||||
objMeta := object.UnstructuredToObjMetaOrDie(r)
|
||||
|
|
@ -112,26 +172,7 @@ func newTestApplier(
|
|||
tf.UnstructuredClient = newFakeRESTClient(t, handlers)
|
||||
tf.FakeDynamicClient = fakeDynamicClient(t, mapper, objs...)
|
||||
|
||||
// Use an InventoryClient with a fakeInfoHelper to allow generating Info
|
||||
// objects that use the FakeRESTClient as the UnstructuredClient.
|
||||
invClient, err := inventory.ClusterInventoryClientFactory{}.NewInventoryClient(tf)
|
||||
require.NoError(t, err)
|
||||
infoHelper := &fakeInfoHelper{
|
||||
factory: tf,
|
||||
}
|
||||
// TODO(mortent): This is not great, but at least this keeps the
|
||||
// ugliness in the test code until we can find a way to wire it
|
||||
// up so to avoid it.
|
||||
invClient.(*inventory.ClusterInventoryClient).InfoHelper = infoHelper
|
||||
|
||||
applier, err := NewApplier(tf, invClient, statusPoller)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Inject the fakeInfoHelper to allow generating Info
|
||||
// objects that use the FakeRESTClient as the UnstructuredClient.
|
||||
applier.infoHelper = infoHelper
|
||||
|
||||
return applier
|
||||
return tf
|
||||
}
|
||||
|
||||
type resourceInfo struct {
|
||||
|
|
@ -210,6 +251,30 @@ func (g *genericHandler) handle(t *testing.T, req *http.Request) (*http.Response
|
|||
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, true, nil
|
||||
}
|
||||
|
||||
if req.URL.Path == singlePath && req.Method == http.MethodDelete {
|
||||
if r.exists {
|
||||
bodyRC := ioutil.NopCloser(bytes.NewReader(toJSONBytes(t, r.resource)))
|
||||
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, true, nil
|
||||
}
|
||||
|
||||
// We're not testing DeletePropagationOrphan, so StatusOK should be
|
||||
// safe. Otherwise, the status might be StatusAccepted.
|
||||
// https://github.com/kubernetes/apiserver/blob/v0.22.2/pkg/endpoints/handlers/delete.go#L140
|
||||
status := http.StatusOK
|
||||
|
||||
// Return Status object, if resource doesn't exist.
|
||||
result := &metav1.Status{
|
||||
Status: metav1.StatusSuccess,
|
||||
Code: int32(status),
|
||||
Details: &metav1.StatusDetails{
|
||||
Name: r.resource.GetName(),
|
||||
Kind: r.resource.GetKind(),
|
||||
},
|
||||
}
|
||||
bodyRC := ioutil.NopCloser(bytes.NewReader(toJSONBytes(t, result)))
|
||||
return &http.Response{StatusCode: status, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, true, nil
|
||||
}
|
||||
|
||||
if req.URL.Path == allPath && req.Method == http.MethodPost {
|
||||
bodyRC := ioutil.NopCloser(bytes.NewReader(toJSONBytes(t, r.resource)))
|
||||
return &http.Response{StatusCode: http.StatusCreated, Header: cmdtesting.DefaultHeader(), Body: bodyRC}, true, nil
|
||||
|
|
@ -450,49 +515,3 @@ func toJSONBytes(t *testing.T, obj runtime.Object) []byte {
|
|||
}
|
||||
return objBytes
|
||||
}
|
||||
|
||||
type atomicBool struct {
|
||||
value bool
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (b *atomicBool) Set(value bool) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
b.value = value
|
||||
}
|
||||
|
||||
func (b *atomicBool) Get() bool {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
return b.value
|
||||
}
|
||||
|
||||
// withTimeout functions like context.WithTimeout, except it also returns an
|
||||
// atomicBool, which describes whether the timeout has occurred.
|
||||
// If the context is cancelled by the timeout, the atomicBool will be true.
|
||||
// If the context is cancelled before the timeout, the atomicBool will be false.
|
||||
func withTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc, *atomicBool) {
|
||||
timedOut := &atomicBool{}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
cancelCh := make(chan struct{})
|
||||
go func() {
|
||||
timeout := time.NewTimer(timeout)
|
||||
defer timeout.Stop()
|
||||
// wait for timeout or cancelCh to be closed
|
||||
select {
|
||||
case <-timeout.C:
|
||||
// Cancelled by timeout
|
||||
timedOut.Set(true)
|
||||
cancel()
|
||||
case <-cancelCh:
|
||||
// Cancelled by caller
|
||||
}
|
||||
}()
|
||||
cancelFn := func() {
|
||||
// Cancelled by caller
|
||||
close(cancelCh)
|
||||
cancel()
|
||||
}
|
||||
return ctx, cancelFn, timedOut
|
||||
}
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ func setDestroyerDefaults(o *DestroyerOptions) {
|
|||
// Run performs the destroy step. Passes the inventory object. This
|
||||
// happens asynchronously on progress and any errors are reported
|
||||
// back on the event channel.
|
||||
func (d *Destroyer) Run(inv inventory.InventoryInfo, options DestroyerOptions) <-chan event.Event {
|
||||
func (d *Destroyer) Run(ctx context.Context, inv inventory.InventoryInfo, options DestroyerOptions) <-chan event.Event {
|
||||
eventChannel := make(chan event.Event)
|
||||
setDestroyerDefaults(&options)
|
||||
go func() {
|
||||
|
|
@ -153,7 +153,7 @@ func (d *Destroyer) Run(inv inventory.InventoryInfo, options DestroyerOptions) <
|
|||
runner := taskrunner.NewTaskStatusRunner(deleteIds, d.statusPoller, resourceCache)
|
||||
klog.V(4).Infoln("destroyer running TaskStatusRunner...")
|
||||
// TODO(seans): Make the poll interval configurable like the applier.
|
||||
err = runner.Run(context.Background(), taskQueue.ToChannel(), eventChannel, taskrunner.Options{
|
||||
err = runner.Run(ctx, taskQueue.ToChannel(), eventChannel, taskrunner.Options{
|
||||
UseCache: true,
|
||||
PollInterval: options.PollInterval,
|
||||
EmitStatusEvents: options.EmitStatusEvents,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,348 @@
|
|||
// Copyright 2020 The Kubernetes Authors.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package apply
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"sigs.k8s.io/cli-utils/pkg/apply/event"
|
||||
"sigs.k8s.io/cli-utils/pkg/inventory"
|
||||
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
|
||||
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
|
||||
"sigs.k8s.io/cli-utils/pkg/object"
|
||||
"sigs.k8s.io/cli-utils/pkg/testutil"
|
||||
)
|
||||
|
||||
func TestDestroyerCancel(t *testing.T) {
|
||||
testCases := map[string]struct {
|
||||
// inventory input to destroyer
|
||||
invInfo inventoryInfo
|
||||
// objects in the cluster
|
||||
clusterObjs object.UnstructuredSet
|
||||
// options input to destroyer.Run
|
||||
options DestroyerOptions
|
||||
// timeout for destroyer.Run
|
||||
runTimeout time.Duration
|
||||
// timeout for the test
|
||||
testTimeout time.Duration
|
||||
// fake input events from the status poller
|
||||
statusEvents []pollevent.Event
|
||||
// expected output status events (async)
|
||||
expectedStatusEvents []testutil.ExpEvent
|
||||
// expected output events
|
||||
expectedEvents []testutil.ExpEvent
|
||||
// true if runTimeout is expected to have caused cancellation
|
||||
expectRunTimeout bool
|
||||
}{
|
||||
"cancelled by caller while waiting for deletion": {
|
||||
expectRunTimeout: true,
|
||||
runTimeout: 2 * time.Second,
|
||||
testTimeout: 30 * time.Second,
|
||||
invInfo: inventoryInfo{
|
||||
name: "abc-123",
|
||||
namespace: "test",
|
||||
id: "test",
|
||||
set: object.ObjMetadataSet{
|
||||
testutil.ToIdentifier(t, resources["deployment"]),
|
||||
},
|
||||
},
|
||||
clusterObjs: object.UnstructuredSet{
|
||||
testutil.Unstructured(t, resources["deployment"], testutil.AddOwningInv(t, "test")),
|
||||
},
|
||||
options: DestroyerOptions{
|
||||
EmitStatusEvents: true,
|
||||
// DeleteTimeout needs to block long enough to cancel the run,
|
||||
// otherwise the WaitTask is skipped.
|
||||
DeleteTimeout: 1 * time.Minute,
|
||||
},
|
||||
statusEvents: []pollevent.Event{
|
||||
{
|
||||
EventType: pollevent.ResourceUpdateEvent,
|
||||
Resource: &pollevent.ResourceStatus{
|
||||
Identifier: testutil.ToIdentifier(t, resources["deployment"]),
|
||||
Status: status.InProgressStatus,
|
||||
Resource: testutil.Unstructured(t, resources["deployment"], testutil.AddOwningInv(t, "test")),
|
||||
},
|
||||
},
|
||||
{
|
||||
EventType: pollevent.ResourceUpdateEvent,
|
||||
Resource: &pollevent.ResourceStatus{
|
||||
Identifier: testutil.ToIdentifier(t, resources["deployment"]),
|
||||
Status: status.InProgressStatus,
|
||||
Resource: testutil.Unstructured(t, resources["deployment"], testutil.AddOwningInv(t, "test")),
|
||||
},
|
||||
},
|
||||
// Resource never becomes NotFound, blocking destroyer.Run from exiting
|
||||
},
|
||||
expectedStatusEvents: []testutil.ExpEvent{
|
||||
{
|
||||
EventType: event.StatusType,
|
||||
StatusEvent: &testutil.ExpStatusEvent{
|
||||
Identifier: testutil.ToIdentifier(t, resources["deployment"]),
|
||||
Status: status.InProgressStatus,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedEvents: []testutil.ExpEvent{
|
||||
{
|
||||
// InitTask
|
||||
EventType: event.InitType,
|
||||
InitEvent: &testutil.ExpInitEvent{},
|
||||
},
|
||||
{
|
||||
// PruneTask start
|
||||
EventType: event.ActionGroupType,
|
||||
ActionGroupEvent: &testutil.ExpActionGroupEvent{
|
||||
Action: event.DeleteAction,
|
||||
GroupName: "prune-0",
|
||||
Type: event.Started,
|
||||
},
|
||||
},
|
||||
{
|
||||
// Delete Deployment
|
||||
EventType: event.DeleteType,
|
||||
DeleteEvent: &testutil.ExpDeleteEvent{
|
||||
GroupName: "prune-0",
|
||||
Operation: event.Deleted,
|
||||
Identifier: testutil.ToIdentifier(t, resources["deployment"]),
|
||||
Error: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
// PruneTask finished
|
||||
EventType: event.ActionGroupType,
|
||||
ActionGroupEvent: &testutil.ExpActionGroupEvent{
|
||||
Action: event.DeleteAction,
|
||||
GroupName: "prune-0",
|
||||
Type: event.Finished,
|
||||
},
|
||||
},
|
||||
{
|
||||
// WaitTask start
|
||||
EventType: event.ActionGroupType,
|
||||
ActionGroupEvent: &testutil.ExpActionGroupEvent{
|
||||
Action: event.WaitAction,
|
||||
GroupName: "wait-0",
|
||||
Type: event.Started,
|
||||
},
|
||||
},
|
||||
// Deployment never becomes NotFound.
|
||||
// WaitTask is expected to be cancelled before DeleteTimeout.
|
||||
{
|
||||
// WaitTask finished
|
||||
EventType: event.ActionGroupType,
|
||||
ActionGroupEvent: &testutil.ExpActionGroupEvent{
|
||||
Action: event.WaitAction,
|
||||
GroupName: "wait-0",
|
||||
Type: event.Finished, // TODO: add Cancelled event type
|
||||
},
|
||||
},
|
||||
// Inventory cannot be deleted, because the objects still exist,
|
||||
// even tho they've been deleted (ex: blocked by finalizer).
|
||||
},
|
||||
},
|
||||
"completed with timeout": {
|
||||
expectRunTimeout: false,
|
||||
runTimeout: 10 * time.Second,
|
||||
testTimeout: 30 * time.Second,
|
||||
invInfo: inventoryInfo{
|
||||
name: "abc-123",
|
||||
namespace: "test",
|
||||
id: "test",
|
||||
set: object.ObjMetadataSet{
|
||||
testutil.ToIdentifier(t, resources["deployment"]),
|
||||
},
|
||||
},
|
||||
clusterObjs: object.UnstructuredSet{
|
||||
testutil.Unstructured(t, resources["deployment"], testutil.AddOwningInv(t, "test")),
|
||||
},
|
||||
options: DestroyerOptions{
|
||||
EmitStatusEvents: true,
|
||||
// DeleteTimeout needs to block long enough for completion
|
||||
DeleteTimeout: 1 * time.Minute,
|
||||
},
|
||||
statusEvents: []pollevent.Event{
|
||||
{
|
||||
EventType: pollevent.ResourceUpdateEvent,
|
||||
Resource: &pollevent.ResourceStatus{
|
||||
Identifier: testutil.ToIdentifier(t, resources["deployment"]),
|
||||
Status: status.InProgressStatus,
|
||||
Resource: testutil.Unstructured(t, resources["deployment"], testutil.AddOwningInv(t, "test")),
|
||||
},
|
||||
},
|
||||
{
|
||||
EventType: pollevent.ResourceUpdateEvent,
|
||||
Resource: &pollevent.ResourceStatus{
|
||||
Identifier: testutil.ToIdentifier(t, resources["deployment"]),
|
||||
Status: status.NotFoundStatus,
|
||||
},
|
||||
},
|
||||
// Resource becoming NotFound should unblock destroyer.Run WaitTask
|
||||
},
|
||||
expectedStatusEvents: []testutil.ExpEvent{
|
||||
{
|
||||
EventType: event.StatusType,
|
||||
StatusEvent: &testutil.ExpStatusEvent{
|
||||
Identifier: testutil.ToIdentifier(t, resources["deployment"]),
|
||||
Status: status.InProgressStatus,
|
||||
},
|
||||
},
|
||||
{
|
||||
EventType: event.StatusType,
|
||||
StatusEvent: &testutil.ExpStatusEvent{
|
||||
Identifier: testutil.ToIdentifier(t, resources["deployment"]),
|
||||
Status: status.NotFoundStatus,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedEvents: []testutil.ExpEvent{
|
||||
{
|
||||
// InitTask
|
||||
EventType: event.InitType,
|
||||
InitEvent: &testutil.ExpInitEvent{},
|
||||
},
|
||||
{
|
||||
// PruneTask start
|
||||
EventType: event.ActionGroupType,
|
||||
ActionGroupEvent: &testutil.ExpActionGroupEvent{
|
||||
Action: event.DeleteAction,
|
||||
GroupName: "prune-0",
|
||||
Type: event.Started,
|
||||
},
|
||||
},
|
||||
{
|
||||
// Delete Deployment
|
||||
EventType: event.DeleteType,
|
||||
DeleteEvent: &testutil.ExpDeleteEvent{
|
||||
GroupName: "prune-0",
|
||||
Operation: event.Deleted,
|
||||
Identifier: testutil.ToIdentifier(t, resources["deployment"]),
|
||||
Error: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
// PruneTask finished
|
||||
EventType: event.ActionGroupType,
|
||||
ActionGroupEvent: &testutil.ExpActionGroupEvent{
|
||||
Action: event.DeleteAction,
|
||||
GroupName: "prune-0",
|
||||
Type: event.Finished,
|
||||
},
|
||||
},
|
||||
{
|
||||
// WaitTask start
|
||||
EventType: event.ActionGroupType,
|
||||
ActionGroupEvent: &testutil.ExpActionGroupEvent{
|
||||
Action: event.WaitAction,
|
||||
GroupName: "wait-0",
|
||||
Type: event.Started,
|
||||
},
|
||||
},
|
||||
// Deployment becomes NotFound.
|
||||
{
|
||||
// WaitTask finished
|
||||
EventType: event.ActionGroupType,
|
||||
ActionGroupEvent: &testutil.ExpActionGroupEvent{
|
||||
Action: event.WaitAction,
|
||||
GroupName: "wait-0",
|
||||
Type: event.Finished,
|
||||
},
|
||||
},
|
||||
{
|
||||
// DeleteInvTask start
|
||||
EventType: event.ActionGroupType,
|
||||
ActionGroupEvent: &testutil.ExpActionGroupEvent{
|
||||
Action: event.InventoryAction,
|
||||
GroupName: "delete-inventory-0",
|
||||
Type: event.Started,
|
||||
},
|
||||
},
|
||||
{
|
||||
// DeleteInvTask finished
|
||||
EventType: event.ActionGroupType,
|
||||
ActionGroupEvent: &testutil.ExpActionGroupEvent{
|
||||
Action: event.InventoryAction,
|
||||
GroupName: "delete-inventory-0",
|
||||
Type: event.Finished,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for tn, tc := range testCases {
|
||||
t.Run(tn, func(t *testing.T) {
|
||||
poller := newFakePoller(tc.statusEvents)
|
||||
|
||||
invInfo := tc.invInfo.toWrapped()
|
||||
|
||||
destroyer := newTestDestroyer(t,
|
||||
tc.invInfo,
|
||||
// Add the inventory to the cluster (to allow deletion)
|
||||
append(tc.clusterObjs, inventory.InvInfoToConfigMap(invInfo)),
|
||||
poller,
|
||||
)
|
||||
|
||||
// Context for Destroyer.Run
|
||||
runCtx, runCancel := context.WithTimeout(context.Background(), tc.runTimeout)
|
||||
defer runCancel() // cleanup
|
||||
|
||||
// Context for this test (in case Destroyer.Run never closes the event channel)
|
||||
testCtx, testCancel := context.WithTimeout(context.Background(), tc.testTimeout)
|
||||
defer testCancel() // cleanup
|
||||
|
||||
eventChannel := destroyer.Run(runCtx, invInfo, tc.options)
|
||||
|
||||
// Start sending status events
|
||||
poller.Start()
|
||||
|
||||
var events []event.Event
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-testCtx.Done():
|
||||
// Test timed out
|
||||
runCancel()
|
||||
t.Errorf("Destroyer.Run failed to respond to cancellation (expected: %s, timeout: %s)", tc.runTimeout, tc.testTimeout)
|
||||
break loop
|
||||
|
||||
case e, ok := <-eventChannel:
|
||||
if !ok {
|
||||
// Event channel closed
|
||||
testCancel()
|
||||
break loop
|
||||
}
|
||||
events = append(events, e)
|
||||
}
|
||||
}
|
||||
|
||||
// Convert events to test events for comparison
|
||||
receivedEvents := testutil.EventsToExpEvents(events)
|
||||
|
||||
// Validate & remove expected status events
|
||||
for _, e := range tc.expectedStatusEvents {
|
||||
var removed int
|
||||
receivedEvents, removed = testutil.RemoveEqualEvents(receivedEvents, e)
|
||||
if removed < 1 {
|
||||
t.Errorf("Expected status event not received: %#v", e)
|
||||
}
|
||||
}
|
||||
|
||||
// Validate the rest of the events
|
||||
testutil.AssertEqual(t, receivedEvents, tc.expectedEvents)
|
||||
|
||||
// Validate that the expected timeout was the cause of the run completion.
|
||||
// just in case something else cancelled the run
|
||||
if tc.expectRunTimeout {
|
||||
assert.Equal(t, context.DeadlineExceeded, runCtx.Err(), "Destroyer.Run exited, but not by expected timeout")
|
||||
} else {
|
||||
assert.Nil(t, runCtx.Err(), "Destroyer.Run exited, but not by expected timeout")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -9,6 +9,7 @@ import (
|
|||
"sort"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/cli-utils/pkg/apply/cache"
|
||||
"sigs.k8s.io/cli-utils/pkg/apply/event"
|
||||
"sigs.k8s.io/cli-utils/pkg/apply/poller"
|
||||
|
|
@ -50,6 +51,9 @@ type Options struct {
|
|||
// that does most of the work.
|
||||
func (tsr *taskStatusRunner) Run(ctx context.Context, taskQueue chan Task,
|
||||
eventChannel chan event.Event, options Options) error {
|
||||
// Give the poller its own context and run it in the background.
|
||||
// If taskStatusRunner.Run is cancelled, baseRunner.run will exit early,
|
||||
// causing the poller to be cancelled.
|
||||
statusCtx, cancelFunc := context.WithCancel(context.Background())
|
||||
statusChannel := tsr.statusPoller.Poll(statusCtx, tsr.identifiers, polling.Options{
|
||||
PollInterval: options.PollInterval,
|
||||
|
|
@ -253,6 +257,7 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
|
|||
case <-doneCh:
|
||||
doneCh = nil // Set doneCh to nil so we don't enter a busy loop.
|
||||
abort = true
|
||||
klog.V(3).Info("taskrunner cancelled by caller")
|
||||
completeIfWaitTask(currentTask, taskContext)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -281,7 +281,7 @@ func (cic *ClusterInventoryClient) getClusterInventoryObjsByLabel(inv InventoryI
|
|||
return nil, err
|
||||
}
|
||||
labelSelector := fmt.Sprintf("%s=%s", common.InventoryLabel, label)
|
||||
klog.V(4).Infof("prune inventory object fetch: %s/%s/%s", groupResource, namespace, labelSelector)
|
||||
klog.V(4).Infof("inventory object fetch by label (group: %q, namespace: %q, selector: %q)", groupResource, namespace, labelSelector)
|
||||
builder := cic.builderFunc()
|
||||
retrievedInventoryInfos, err := builder.
|
||||
Unstructured().
|
||||
|
|
@ -316,6 +316,7 @@ func (cic *ClusterInventoryClient) getClusterInventoryObjsByName(inv InventoryIn
|
|||
return nil, err
|
||||
}
|
||||
|
||||
klog.V(4).Infof("inventory object fetch by name (namespace: %q, name: %q)", inv.Namespace(), inv.Name())
|
||||
res, err := helper.Get(inv.Namespace(), inv.Name())
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -174,7 +174,7 @@ func applyAndDestroyTest(c client.Client, invConfig InventoryConfig, inventoryNa
|
|||
destroyer := invConfig.DestroyerFactoryFunc()
|
||||
|
||||
options := apply.DestroyerOptions{InventoryPolicy: inventory.AdoptIfNoInventory}
|
||||
destroyerEvents := runCollect(destroyer.Run(inventoryInfo, options))
|
||||
destroyerEvents := runCollect(destroyer.Run(context.TODO(), inventoryInfo, options))
|
||||
|
||||
expEvents = []testutil.ExpEvent{
|
||||
{
|
||||
|
|
|
|||
|
|
@ -175,7 +175,7 @@ func crdTest(_ client.Client, invConfig InventoryConfig, inventoryName, namespac
|
|||
By("destroy the resources, including the crd")
|
||||
destroyer := invConfig.DestroyerFactoryFunc()
|
||||
options := apply.DestroyerOptions{InventoryPolicy: inventory.AdoptIfNoInventory}
|
||||
destroyerEvents := runCollect(destroyer.Run(inv, options))
|
||||
destroyerEvents := runCollect(destroyer.Run(context.TODO(), inv, options))
|
||||
|
||||
expEvents = []testutil.ExpEvent{
|
||||
{
|
||||
|
|
|
|||
|
|
@ -243,7 +243,7 @@ func dependsOnTest(c client.Client, invConfig InventoryConfig, inventoryName, na
|
|||
By("destroy resources in opposite order")
|
||||
destroyer := invConfig.DestroyerFactoryFunc()
|
||||
options := apply.DestroyerOptions{InventoryPolicy: inventory.AdoptIfNoInventory}
|
||||
destroyerEvents := runCollect(destroyer.Run(inv, options))
|
||||
destroyerEvents := runCollect(destroyer.Run(context.TODO(), inv, options))
|
||||
|
||||
expEvents = []testutil.ExpEvent{
|
||||
{
|
||||
|
|
|
|||
|
|
@ -216,7 +216,7 @@ func mutationTest(c client.Client, invConfig InventoryConfig, inventoryName, nam
|
|||
By("destroy resources in opposite order")
|
||||
destroyer := invConfig.DestroyerFactoryFunc()
|
||||
options := apply.DestroyerOptions{InventoryPolicy: inventory.AdoptIfNoInventory}
|
||||
destroyerEvents := runCollect(destroyer.Run(inv, options))
|
||||
destroyerEvents := runCollect(destroyer.Run(context.TODO(), inv, options))
|
||||
|
||||
expEvents = []testutil.ExpEvent{
|
||||
{
|
||||
|
|
|
|||
|
|
@ -254,7 +254,7 @@ func pruneRetrieveErrorTest(c client.Client, invConfig InventoryConfig, inventor
|
|||
destroyer := invConfig.DestroyerFactoryFunc()
|
||||
|
||||
options := apply.DestroyerOptions{InventoryPolicy: inventory.AdoptIfNoInventory}
|
||||
destroyerEvents := runCollect(destroyer.Run(inv, options))
|
||||
destroyerEvents := runCollect(destroyer.Run(context.TODO(), inv, options))
|
||||
|
||||
expEvents3 := []testutil.ExpEvent{
|
||||
{
|
||||
|
|
|
|||
Loading…
Reference in New Issue