feat: replace StatusPoller w/ StatusWatcher

- Add DefaultStatusWatcher that wraps DynamicClient and manages
  informers for a set of resource objects.
  - Supports two modes: root-scoped & namespace-scoped.
  - Root-scoped mode uses root-scoped informers to efficiency and
    performance.
  - Namespace-scoped mode uses namespace-scoped informers to
    minimize the permissions needed to run and the size of the
    in-memory object cache.
  - Automatic mode selects which mode to use based on whether the
    objects being watched are in one or multiple namespaces.
    This is the default mode, optimizing for performance.
  - If CRDs are being watched, the creation/deletion of CRDs can
    cause informers for those custom resources to be created/deleted.
  - In namespace-scope mode, if namespaces are being watched, the
    creation/deletion of namespaces can also trigger informers to
    be created/deleted.
  - All creates/updates/deletes to CRDs also cause RESTMapper reset.
  - Allow pods to be unschedulable for 15s before reporting the
    status as Failed. Any update resets the timer.
- Add BlindStatusWatcher for testing and disabling for dry-run.
- Add DynamicClusterReader that wraps DynamicClient.
  This is now used to look up generated resources
  (ex: Deployment > ReplicaSets > Pods).
- Add DefaultStatusReader which uses a DelegatingStatusReader to
  wrap a list of conventional and specific StatusReaders.
  This should make it easier to extend the list of StatusReaders.
- Move some pending WaitEvents to be optional in tests, now that
  StatusWatcher can resolve their status before the WaitTask starts.
- Add a new Thousand Deployments stress test (10x kind nodes)
- Add some new logs for easier debugging
- Add internal SyncEvent so that apply/delete tasks don't start
  until the StatusWatcher has finished initial synchronization.
  This helps avoid missing events from actions that happen while
  synchronization is incomplete.
- Filter optional pending WaitEvents when testing.

BREAKING CHANGE: Replace StatusPoller w/ StatusWatcher
BREAKING CHANGE: Remove PollInterval (obsolete with watcher)
This commit is contained in:
Karl Isenberg 2022-04-09 06:52:40 -07:00
parent 02d2092d8c
commit c46949360e
50 changed files with 3251 additions and 204 deletions

View File

@ -101,7 +101,9 @@ test-e2e-focus: "$(MYGOBIN)/ginkgo" "$(MYGOBIN)/kind"
.PHONY: test-stress
test-stress: "$(MYGOBIN)/ginkgo" "$(MYGOBIN)/kind"
kind delete cluster --name=cli-utils-e2e && kind create cluster --name=cli-utils-e2e --wait 5m
kind delete cluster --name=cli-utils-e2e && kind create cluster --name=cli-utils-e2e --wait 5m \
--config=./test/stress/kind-cluster.yaml
kubectl wait nodes --for=condition=ready --all --timeout=5m
"$(MYGOBIN)/ginkgo" -v ./test/stress/... -- -v 3
.PHONY: vet

View File

@ -45,8 +45,6 @@ func GetRunner(factory cmdutil.Factory, invFactory inventory.ClientFactory,
cmd.Flags().StringVar(&r.output, "output", printers.DefaultPrinter(),
fmt.Sprintf("Output format, must be one of %s", strings.Join(printers.SupportedPrinters(), ",")))
cmd.Flags().DurationVar(&r.period, "poll-period", 2*time.Second,
"Polling period for resource statuses.")
cmd.Flags().DurationVar(&r.reconcileTimeout, "reconcile-timeout", time.Duration(0),
"Timeout threshold for waiting for all resources to reach the Current status.")
cmd.Flags().BoolVar(&r.noPrune, "no-prune", r.noPrune,
@ -81,7 +79,6 @@ type Runner struct {
serverSideOptions common.ServerSideOptions
output string
period time.Duration
reconcileTimeout time.Duration
noPrune bool
prunePropagationPolicy string
@ -156,7 +153,6 @@ func (r *Runner) RunE(cmd *cobra.Command, args []string) error {
ch := a.Run(ctx, inv, objs, apply.ApplierOptions{
ServerSideOptions: r.serverSideOptions,
PollInterval: r.period,
ReconcileTimeout: r.reconcileTimeout,
// If we are not waiting for status, tell the applier to not
// emit the events.

View File

@ -20,18 +20,16 @@ import (
"sigs.k8s.io/cli-utils/pkg/apply/filter"
"sigs.k8s.io/cli-utils/pkg/apply/info"
"sigs.k8s.io/cli-utils/pkg/apply/mutator"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/apply/prune"
"sigs.k8s.io/cli-utils/pkg/apply/solver"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/validation"
)
const defaultPollInterval = 2 * time.Second
// Applier performs the step of applying a set of resources into a cluster,
// conditionally waits for all of them to be fully reconciled and finally
// performs prune to clean up any resources that has been deleted.
@ -44,7 +42,7 @@ const defaultPollInterval = 2 * time.Second
// cluster, different sets of tasks might be needed.
type Applier struct {
pruner *prune.Pruner
statusPoller poller.Poller
statusWatcher watcher.StatusWatcher
invClient inventory.Client
client dynamic.Interface
openAPIGetter discovery.OpenAPISchemaInterface
@ -236,10 +234,14 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.Info, objects objec
// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("applier building TaskStatusRunner...")
allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
runner := taskrunner.NewTaskStatusRunner(allIds, a.statusPoller)
statusWatcher := a.statusWatcher
// Disable watcher for dry runs
if opts.DryRunStrategy.ClientOrServerDryRun() {
statusWatcher = watcher.BlindStatusWatcher{}
}
runner := taskrunner.NewTaskStatusRunner(allIds, statusWatcher)
klog.V(4).Infoln("applier running TaskStatusRunner...")
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
PollInterval: options.PollInterval,
EmitStatusEvents: options.EmitStatusEvents,
})
if err != nil {
@ -259,10 +261,6 @@ type ApplierOptions struct {
// how long to wait.
ReconcileTimeout time.Duration
// PollInterval defines how often we should poll for the status
// of resources.
PollInterval time.Duration
// EmitStatusEvents defines whether status events should be
// emitted on the eventChannel to the caller.
EmitStatusEvents bool
@ -295,9 +293,6 @@ type ApplierOptions struct {
// setDefaults set the options to the default values if they
// have not been provided.
func setDefaults(o *ApplierOptions) {
if o.PollInterval == 0 {
o.PollInterval = defaultPollInterval
}
if o.PrunePropagationPolicy == "" {
o.PrunePropagationPolicy = metav1.DeletePropagationBackground
}

View File

@ -13,13 +13,10 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/apply/info"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/apply/prune"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
)
type ApplierBuilder struct {
@ -31,7 +28,7 @@ type ApplierBuilder struct {
mapper meta.RESTMapper
restConfig *rest.Config
unstructuredClientForMapping func(*meta.RESTMapping) (resource.RESTClient, error)
statusPoller poller.Poller
statusWatcher watcher.StatusWatcher
}
// NewApplierBuilder returns a new ApplierBuilder.
@ -52,7 +49,7 @@ func (b *ApplierBuilder) Build() (*Applier, error) {
Client: bx.client,
Mapper: bx.mapper,
},
statusPoller: bx.statusPoller,
statusWatcher: bx.statusWatcher,
invClient: bx.invClient,
client: bx.client,
openAPIGetter: bx.discoClient,
@ -109,12 +106,8 @@ func (b *ApplierBuilder) finalize() (*ApplierBuilder, error) {
}
bx.unstructuredClientForMapping = bx.factory.UnstructuredClientForMapping
}
if bx.statusPoller == nil {
c, err := client.New(bx.restConfig, client.Options{Scheme: scheme.Scheme, Mapper: bx.mapper})
if err != nil {
return nil, fmt.Errorf("error creating client: %v", err)
}
bx.statusPoller = polling.NewStatusPoller(c, bx.mapper, polling.Options{})
if bx.statusWatcher == nil {
bx.statusWatcher = watcher.NewDefaultStatusWatcher(bx.client, bx.mapper)
}
return &bx, nil
}
@ -154,7 +147,7 @@ func (b *ApplierBuilder) WithUnstructuredClientForMapping(unstructuredClientForM
return b
}
func (b *ApplierBuilder) WithStatusPoller(statusPoller poller.Poller) *ApplierBuilder {
b.statusPoller = statusPoller
func (b *ApplierBuilder) WithStatusWatcher(statusWatcher watcher.StatusWatcher) *ApplierBuilder {
b.statusWatcher = statusWatcher
return b
}

View File

@ -18,6 +18,7 @@ import (
"sigs.k8s.io/cli-utils/pkg/inventory"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/multierror"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/validation"
@ -97,7 +98,7 @@ func TestApplier(t *testing.T) {
clusterObjs object.UnstructuredSet
// options input to applier.Run
options ApplierOptions
// fake input events from the status poller
// fake input events from the statusWatcher
statusEvents []pollevent.Event
// expected output status events (async)
expectedStatusEvents []testutil.ExpEvent
@ -1401,7 +1402,7 @@ func TestApplier(t *testing.T) {
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
poller := newFakePoller(tc.statusEvents)
statusWatcher := newFakeWatcher(tc.statusEvents)
// Only feed valid objects into the TestApplier.
// Invalid objects should not generate API requests.
@ -1418,7 +1419,7 @@ func TestApplier(t *testing.T) {
tc.invInfo,
validObjs,
tc.clusterObjs,
poller,
statusWatcher,
)
// Context for Applier.Run
@ -1463,7 +1464,7 @@ func TestApplier(t *testing.T) {
e.ActionGroupEvent.Action == event.PruneAction {
once.Do(func() {
// start events
poller.Start()
statusWatcher.Start()
})
}
}
@ -1519,7 +1520,7 @@ func TestApplierCancel(t *testing.T) {
runTimeout time.Duration
// timeout for the test
testTimeout time.Duration
// fake input events from the status poller
// fake input events from the statusWatcher
statusEvents []pollevent.Event
// expected output status events (async)
expectedStatusEvents []testutil.ExpEvent
@ -1854,13 +1855,13 @@ func TestApplierCancel(t *testing.T) {
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
poller := newFakePoller(tc.statusEvents)
statusWatcher := newFakeWatcher(tc.statusEvents)
applier := newTestApplier(t,
tc.invInfo,
tc.resources,
tc.clusterObjs,
poller,
statusWatcher,
)
// Context for Applier.Run
@ -1902,7 +1903,7 @@ func TestApplierCancel(t *testing.T) {
e.ActionGroupEvent.Action == event.PruneAction {
once.Do(func() {
// start events
poller.Start()
statusWatcher.Start()
})
}
}
@ -2046,7 +2047,7 @@ func TestReadAndPrepareObjects(t *testing.T) {
tc.resources,
tc.clusterObjs,
// no events needed for prepareObjects
newFakePoller([]pollevent.Event{}),
watcher.BlindStatusWatcher{},
)
applyObjs, pruneObjs, err := applier.prepareObjects(tc.invInfo.toWrapped(), tc.resources, ApplierOptions{})

View File

@ -53,11 +53,11 @@ func (rc *ResourceCacheMap) Get(id object.ObjMetadata) ResourceStatus {
defer rc.mu.RUnlock()
obj, found := rc.cache[id]
if klog.V(4).Enabled() {
if klog.V(6).Enabled() {
if found {
klog.Infof("resource cache hit: %s", id)
klog.V(6).Infof("resource cache hit: %s", id)
} else {
klog.Infof("resource cache miss: %s", id)
klog.V(6).Infof("resource cache miss: %s", id)
}
}
if !found {

View File

@ -27,12 +27,11 @@ import (
"k8s.io/klog/v2"
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/jsonpath"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
)
@ -74,7 +73,7 @@ func newTestApplier(
invInfo inventoryInfo,
resources object.UnstructuredSet,
clusterObjs object.UnstructuredSet,
statusPoller poller.Poller,
statusWatcher watcher.StatusWatcher,
) *Applier {
tf := newTestFactory(t, invInfo, resources, clusterObjs)
defer tf.Cleanup()
@ -88,7 +87,7 @@ func newTestApplier(
applier, err := NewApplierBuilder().
WithFactory(tf).
WithInventoryClient(invClient).
WithStatusPoller(statusPoller).
WithStatusWatcher(statusWatcher).
Build()
require.NoError(t, err)
@ -103,7 +102,7 @@ func newTestDestroyer(
t *testing.T,
invInfo inventoryInfo,
clusterObjs object.UnstructuredSet,
statusPoller poller.Poller,
statusWatcher watcher.StatusWatcher,
) *Destroyer {
tf := newTestFactory(t, invInfo, object.UnstructuredSet{}, clusterObjs)
defer tf.Cleanup()
@ -112,7 +111,7 @@ func newTestDestroyer(
destroyer, err := NewDestroyer(tf, invClient)
require.NoError(t, err)
destroyer.StatusPoller = statusPoller
destroyer.statusWatcher = statusWatcher
return destroyer
}
@ -345,27 +344,29 @@ func (n *nsHandler) handle(t *testing.T, req *http.Request) (*http.Response, boo
return nil, false, nil
}
type fakePoller struct {
type fakeWatcher struct {
start chan struct{}
events []pollevent.Event
}
func newFakePoller(statusEvents []pollevent.Event) *fakePoller {
return &fakePoller{
func newFakeWatcher(statusEvents []pollevent.Event) *fakeWatcher {
return &fakeWatcher{
events: statusEvents,
start: make(chan struct{}),
}
}
// Start events being sent on the status channel
func (f *fakePoller) Start() {
func (f *fakeWatcher) Start() {
close(f.start)
}
func (f *fakePoller) Poll(ctx context.Context, _ object.ObjMetadataSet, _ polling.PollOptions) <-chan pollevent.Event {
func (f *fakeWatcher) Watch(ctx context.Context, _ object.ObjMetadataSet, _ watcher.Options) <-chan pollevent.Event {
eventChannel := make(chan pollevent.Event)
go func() {
defer close(eventChannel)
// send sync event immediately
eventChannel <- pollevent.Event{Type: pollevent.SyncEvent}
// wait until started to send the events
<-f.start
for _, f := range f.events {

View File

@ -16,13 +16,12 @@ import (
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/filter"
"sigs.k8s.io/cli-utils/pkg/apply/info"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/apply/prune"
"sigs.k8s.io/cli-utils/pkg/apply/solver"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/validation"
)
@ -38,25 +37,30 @@ func NewDestroyer(factory cmdutil.Factory, invClient inventory.Client) (*Destroy
if err != nil {
return nil, fmt.Errorf("error setting up PruneOptions: %w", err)
}
statusPoller, err := polling.NewStatusPollerFromFactory(factory, polling.Options{})
client, err := factory.DynamicClient()
if err != nil {
return nil, err
}
mapper, err := factory.ToRESTMapper()
if err != nil {
return nil, err
}
statusWatcher := watcher.NewDefaultStatusWatcher(client, mapper)
return &Destroyer{
pruner: pruner,
StatusPoller: statusPoller,
factory: factory,
invClient: invClient,
pruner: pruner,
statusWatcher: statusWatcher,
factory: factory,
invClient: invClient,
}, nil
}
// Destroyer performs the step of grabbing all the previous inventory objects and
// prune them. This also deletes all the previous inventory objects
type Destroyer struct {
pruner *prune.Pruner
StatusPoller poller.Poller
factory cmdutil.Factory
invClient inventory.Client
pruner *prune.Pruner
statusWatcher watcher.StatusWatcher
factory cmdutil.Factory
invClient inventory.Client
}
type DestroyerOptions struct {
@ -80,18 +84,11 @@ type DestroyerOptions struct {
// emitted on the eventChannel to the caller.
EmitStatusEvents bool
// PollInterval defines how often we should poll for the status
// of resources.
PollInterval time.Duration
// ValidationPolicy defines how to handle invalid objects.
ValidationPolicy validation.Policy
}
func setDestroyerDefaults(o *DestroyerOptions) {
if o.PollInterval == time.Duration(0) {
o.PollInterval = defaultPollInterval
}
if o.DeletePropagationPolicy == "" {
o.DeletePropagationPolicy = metav1.DeletePropagationBackground
}
@ -213,10 +210,14 @@ func (d *Destroyer) Run(ctx context.Context, invInfo inventory.Info, options Des
// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("destroyer building TaskStatusRunner...")
deleteIds := object.UnstructuredSetToObjMetadataSet(deleteObjs)
runner := taskrunner.NewTaskStatusRunner(deleteIds, d.StatusPoller)
statusWatcher := d.statusWatcher
// Disable watcher for dry runs
if opts.DryRunStrategy.ClientOrServerDryRun() {
statusWatcher = watcher.BlindStatusWatcher{}
}
runner := taskrunner.NewTaskStatusRunner(deleteIds, statusWatcher)
klog.V(4).Infoln("destroyer running TaskStatusRunner...")
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
PollInterval: options.PollInterval,
EmitStatusEvents: options.EmitStatusEvents,
})
if err != nil {

View File

@ -311,7 +311,7 @@ func TestDestroyerCancel(t *testing.T) {
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
poller := newFakePoller(tc.statusEvents)
statusWatcher := newFakeWatcher(tc.statusEvents)
invInfo := tc.invInfo.toWrapped()
@ -319,7 +319,7 @@ func TestDestroyerCancel(t *testing.T) {
tc.invInfo,
// Add the inventory to the cluster (to allow deletion)
append(tc.clusterObjs, inventory.InvInfoToConfigMap(invInfo)),
poller,
statusWatcher,
)
// Context for Destroyer.Run
@ -357,7 +357,7 @@ func TestDestroyerCancel(t *testing.T) {
e.ActionGroupEvent.Action == event.WaitAction {
once.Do(func() {
// Start sending status events after waiting starts
poller.Start()
statusWatcher.Start()
})
}
}

View File

@ -6,21 +6,20 @@ package taskrunner
import (
"context"
"fmt"
"time"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
)
// NewTaskStatusRunner returns a new TaskStatusRunner.
func NewTaskStatusRunner(identifiers object.ObjMetadataSet, statusPoller poller.Poller) *TaskStatusRunner {
func NewTaskStatusRunner(identifiers object.ObjMetadataSet, statusWatcher watcher.StatusWatcher) *TaskStatusRunner {
return &TaskStatusRunner{
Identifiers: identifiers,
StatusPoller: statusPoller,
Identifiers: identifiers,
StatusWatcher: statusWatcher,
}
}
@ -28,14 +27,13 @@ func NewTaskStatusRunner(identifiers object.ObjMetadataSet, statusPoller poller.
// tasks while at the same time uses the statusPoller to
// keep track of the status of the resources.
type TaskStatusRunner struct {
Identifiers object.ObjMetadataSet
StatusPoller poller.Poller
Identifiers object.ObjMetadataSet
StatusWatcher watcher.StatusWatcher
}
// Options defines properties that is passed along to
// the statusPoller.
type Options struct {
PollInterval time.Duration
EmitStatusEvents bool
}
@ -59,9 +57,7 @@ func (tsr *TaskStatusRunner) Run(
// If taskStatusRunner.Run is cancelled, baseRunner.run will exit early,
// causing the poller to be cancelled.
statusCtx, cancelFunc := context.WithCancel(context.Background())
statusChannel := tsr.StatusPoller.Poll(statusCtx, tsr.Identifiers, polling.PollOptions{
PollInterval: opts.PollInterval,
})
statusChannel := tsr.StatusWatcher.Watch(statusCtx, tsr.Identifiers, watcher.Options{})
// complete stops the statusPoller, drains the statusChannel, and returns
// the provided error.
@ -69,17 +65,17 @@ func (tsr *TaskStatusRunner) Run(
// Avoid using defer, otherwise the statusPoller will hang. It needs to be
// drained synchronously before return, instead of asynchronously after.
complete := func(err error) error {
klog.V(7).Info("Runner cancelled status watcher")
cancelFunc()
for range statusChannel {
for statusEvent := range statusChannel {
klog.V(7).Infof("Runner ignored status event: %v", statusEvent)
}
return err
}
// Find and start the first task in the queue.
currentTask, done := nextTask(taskQueue, taskContext)
if done {
return complete(nil)
}
// Wait until the StatusWatcher is sychronized to start the first task.
var currentTask Task
done := false
// abort is used to signal that something has failed, and
// the task processing should end as soon as is possible. Only
@ -106,18 +102,37 @@ func (tsr *TaskStatusRunner) Run(
// statusEvents.
// TODO(mortent): Check if a closed statusChannel might
// create a busy loop here.
if !ok || abort {
if !ok {
continue
}
// An error event on the statusChannel means the StatusPoller
if abort {
klog.V(7).Infof("Runner ignored status event: %v", statusEvent)
continue
}
klog.V(7).Infof("Runner received status event: %v", statusEvent)
// An error event on the statusChannel means the StatusWatcher
// has encountered a problem so it can't continue. This means
// the statusChannel will be closed soon.
if statusEvent.Type == pollevent.ErrorEvent {
abort = true
abortReason = fmt.Errorf("polling for status failed: %v",
statusEvent.Error)
currentTask.Cancel(taskContext)
if currentTask != nil {
currentTask.Cancel(taskContext)
}
continue
}
// The StatusWatcher is synchronized.
// Tasks may commence!
if statusEvent.Type == pollevent.SyncEvent {
// Find and start the first task in the queue.
currentTask, done = nextTask(taskQueue, taskContext)
if done {
return complete(nil)
}
continue
}
@ -147,8 +162,10 @@ func (tsr *TaskStatusRunner) Run(
// send a status update to the running task, but only if the status
// has changed and the task is tracking the object.
if currentTask.Identifiers().Contains(id) {
currentTask.StatusUpdate(taskContext, id)
if currentTask != nil {
if currentTask.Identifiers().Contains(id) {
currentTask.StatusUpdate(taskContext, id)
}
}
// A message on the taskChannel means that the current task
// has either completed or failed.
@ -187,7 +204,10 @@ func (tsr *TaskStatusRunner) Run(
doneCh = nil // Set doneCh to nil so we don't enter a busy loop.
abort = true
abortReason = ctx.Err() // always non-nil when doneCh is closed
currentTask.Cancel(taskContext)
klog.V(7).Infof("Runner aborting: %v", abortReason)
if currentTask != nil {
currentTask.Cancel(taskContext)
}
}
}
}

View File

@ -14,9 +14,9 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/testutil"
)
@ -276,12 +276,12 @@ func TestBaseRunner(t *testing.T) {
taskQueue <- tsk
}
ids := object.ObjMetadataSet{} // unused by fake poller
poller := newFakePoller(tc.statusEvents)
ids := object.ObjMetadataSet{} // unused by fake statusWatcher
statusWatcher := newFakeWatcher(tc.statusEvents)
eventChannel := make(chan event.Event)
resourceCache := cache.NewResourceCacheMap()
taskContext := NewTaskContext(eventChannel, resourceCache)
runner := NewTaskStatusRunner(ids, poller)
runner := NewTaskStatusRunner(ids, statusWatcher)
// Use a WaitGroup to make sure changes in the goroutines
// are visible to the main goroutine.
@ -293,7 +293,7 @@ func TestBaseRunner(t *testing.T) {
defer wg.Done()
time.Sleep(tc.statusEventsDelay)
poller.Start()
statusWatcher.Start()
}()
var events []event.Event
@ -413,7 +413,7 @@ func TestBaseRunnerCancellation(t *testing.T) {
event.ActionGroupType,
},
},
"error from status poller while wait task is running": {
"error from status watcher while wait task is running": {
tasks: []Task{
NewWaitTask("wait", object.ObjMetadataSet{depID}, AllCurrent,
20*time.Second, testutil.NewFakeRESTMapper()),
@ -448,12 +448,12 @@ func TestBaseRunnerCancellation(t *testing.T) {
taskQueue <- tsk
}
ids := object.ObjMetadataSet{} // unused by fake poller
poller := newFakePoller(tc.statusEvents)
ids := object.ObjMetadataSet{} // unused by fake statusWatcher
statusWatcher := newFakeWatcher(tc.statusEvents)
eventChannel := make(chan event.Event)
resourceCache := cache.NewResourceCacheMap()
taskContext := NewTaskContext(eventChannel, resourceCache)
runner := NewTaskStatusRunner(ids, poller)
runner := NewTaskStatusRunner(ids, statusWatcher)
// Use a WaitGroup to make sure changes in the goroutines
// are visible to the main goroutine.
@ -465,7 +465,7 @@ func TestBaseRunnerCancellation(t *testing.T) {
defer wg.Done()
time.Sleep(tc.statusEventsDelay)
poller.Start()
statusWatcher.Start()
}()
var events []event.Event
@ -540,27 +540,29 @@ func (f *fakeApplyTask) Cancel(_ *TaskContext) {}
func (f *fakeApplyTask) StatusUpdate(_ *TaskContext, _ object.ObjMetadata) {}
type fakePoller struct {
type fakeWatcher struct {
start chan struct{}
events []pollevent.Event
}
func newFakePoller(statusEvents []pollevent.Event) *fakePoller {
return &fakePoller{
func newFakeWatcher(statusEvents []pollevent.Event) *fakeWatcher {
return &fakeWatcher{
events: statusEvents,
start: make(chan struct{}),
}
}
// Start events being sent on the status channel
func (f *fakePoller) Start() {
func (f *fakeWatcher) Start() {
close(f.start)
}
func (f *fakePoller) Poll(ctx context.Context, _ object.ObjMetadataSet, _ polling.PollOptions) <-chan pollevent.Event {
func (f *fakeWatcher) Watch(ctx context.Context, _ object.ObjMetadataSet, _ watcher.Options) <-chan pollevent.Event {
eventChannel := make(chan pollevent.Event)
go func() {
defer close(eventChannel)
// send sync event immediately
eventChannel <- pollevent.Event{Type: pollevent.SyncEvent}
// wait until started to send the events
<-f.start
for _, f := range f.events {

View File

@ -149,6 +149,8 @@ func (w *WaitTask) startInner(taskContext *TaskContext) {
w.mu.Lock()
defer w.mu.Unlock()
klog.V(3).Infof("wait task progress: %d/%d", 0, len(w.Ids))
pending := object.ObjMetadataSet{}
for _, id := range w.Ids {
switch {
@ -181,6 +183,8 @@ func (w *WaitTask) startInner(taskContext *TaskContext) {
}
w.pending = pending
klog.V(3).Infof("wait task progress: %d/%d", len(w.Ids)-len(w.pending), len(w.Ids))
if len(pending) == 0 {
// all reconciled - clear pending and exit
klog.V(3).Infof("all objects reconciled or skipped (name: %q)", w.TaskName)
@ -345,9 +349,7 @@ func (w *WaitTask) StatusUpdate(taskContext *TaskContext, id object.ObjMetadata)
w.pending = w.pending.Remove(id)
w.failed = append(w.failed, id)
w.sendEvent(taskContext, id, event.ReconcileFailed)
default:
// can't be all reconciled now, so don't bother checking
return
// default - still pending
}
case !w.Ids.Contains(id):
// not in wait group - ignore
@ -380,10 +382,7 @@ func (w *WaitTask) StatusUpdate(taskContext *TaskContext, id object.ObjMetadata)
w.pending = append(w.pending, id)
w.sendEvent(taskContext, id, event.ReconcilePending)
}
// can't be all reconciled, so don't bother checking. A failed
// resource doesn't prevent a WaitTask from completing, so there
// must be at least one InProgress resource we are still waiting for.
return
// else - still failed
default:
// reconciled - check if unreconciled
if !w.reconciledByID(taskContext, id) {
@ -395,11 +394,12 @@ func (w *WaitTask) StatusUpdate(taskContext *TaskContext, id object.ObjMetadata)
}
w.pending = append(w.pending, id)
w.sendEvent(taskContext, id, event.ReconcilePending)
// can't be all reconciled now, so don't bother checking
return
}
// else - still reconciled
}
klog.V(3).Infof("wait task progress: %d/%d", len(w.Ids)-len(w.pending), len(w.Ids))
// If we no longer have any pending resources, the WaitTask
// can be completed.
if len(w.pending) == 0 {
@ -426,6 +426,6 @@ func (w *WaitTask) updateRESTMapper(taskContext *TaskContext) {
return
}
klog.V(5).Infof("resetting RESTMapper")
klog.V(3).Infof("Resetting RESTMapper")
meta.MaybeResetRESTMapper(w.Mapper)
}

View File

@ -1,20 +1,23 @@
// Copyright 2019 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Package kstatus contains libraries for computing status of kubernetes
// resources.
// Package kstatus contains libraries for computing status of Kubernetes
// resource objects.
//
// status
// Get status and/or conditions for resources based on resources already
// read from a cluster, i.e. it will not fetch resources from
// a cluster.
// Compute the status of Kubernetes resource objects.
//
// polling
// Poll the cluster for the state of the specified resources
// and compute the status for each as well as the aggregate
// status. The polling will go on until either all resources
// have reached the desired status or the polling is cancelled
// by the caller.
// A common use case for this would to be poll until all resources
// Poll the cluster for the state of the specified resource objects and compute
// the status for each as well as the aggregate status. The polling will
// continue until either all resources have reached the desired status or the
// polling is cancelled by the caller.
//
// watcher
// Watch the cluster for the state of the specified resource objects and compute
// the status for each. The watching will continue until cancelled by the
// caller.
//
// A common use case for this would to be poll/watch until all resource objects
// finish reconciling after apply.
package kstatus

View File

@ -0,0 +1,81 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package clusterreader
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// DynamicClusterReader is an implementation of the ClusterReader that delegates
// all calls directly to the underlying DynamicClient. No caching.
type DynamicClusterReader struct {
DynamicClient dynamic.Interface
Mapper meta.RESTMapper
}
func (n *DynamicClusterReader) Get(ctx context.Context, key client.ObjectKey, obj *unstructured.Unstructured) error {
mapping, err := n.Mapper.RESTMapping(obj.GroupVersionKind().GroupKind())
if err != nil {
return fmt.Errorf("failed to map object: %w", err)
}
serverObj, err := n.DynamicClient.Resource(mapping.Resource).
Namespace(key.Namespace).
Get(ctx, key.Name, metav1.GetOptions{})
if err != nil {
return err
}
serverObj.DeepCopyInto(obj)
return nil
}
func (n *DynamicClusterReader) ListNamespaceScoped(ctx context.Context, list *unstructured.UnstructuredList, namespace string, selector labels.Selector) error {
mapping, err := n.Mapper.RESTMapping(list.GroupVersionKind().GroupKind())
if err != nil {
return fmt.Errorf("failed to map object: %w", err)
}
serverObj, err := n.DynamicClient.Resource(mapping.Resource).
Namespace(namespace).
List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return err
}
serverObj.DeepCopyInto(list)
return nil
}
func (n *DynamicClusterReader) ListClusterScoped(ctx context.Context, list *unstructured.UnstructuredList, selector labels.Selector) error {
mapping, err := n.Mapper.RESTMapping(list.GroupVersionKind().GroupKind())
if err != nil {
return fmt.Errorf("failed to map object: %w", err)
}
serverObj, err := n.DynamicClient.Resource(mapping.Resource).
List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return err
}
serverObj.DeepCopyInto(list)
return nil
}
func (n *DynamicClusterReader) Sync(_ context.Context) error {
return nil
}

View File

@ -4,6 +4,8 @@
package event
import (
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
@ -12,15 +14,19 @@ import (
// Type is the type that describes the type of an Event that is passed back to the caller
// as resources in the cluster are being polled.
//
//go:generate stringer -type=Type
//go:generate stringer -type=Type -linecomment
type Type int
const (
// ResourceUpdateEvent describes events related to a change in the status of one of the polled resources.
ResourceUpdateEvent Type = iota
ResourceUpdateEvent Type = iota // Update
// ErrorEvent signals that the engine has encountered an error that it can not recover from. The engine
// is shutting down and the event channel will be closed after this event.
ErrorEvent
ErrorEvent // Error
// SyncEvent signals that the engine has completed its initial
// synchronization, and the cache is primed. After this point, it's safe to
// assume that you won't miss events caused by your own subsequent actions.
SyncEvent // Sync
)
// Event defines that type that is passed back through the event channel to notify the caller of changes
@ -38,6 +44,16 @@ type Event struct {
Error error
}
// String returns a string suitable for logging
func (e Event) String() string {
if e.Error != nil {
return fmt.Sprintf("Event{ Type: %q, Resource: %v, Error: %q }",
e.Type, e.Resource, e.Error)
}
return fmt.Sprintf("Event{ Type: %q, Resource: %v }",
e.Type, e.Resource)
}
// ResourceStatus contains information about a resource after we have
// fetched it from the cluster and computed status.
type ResourceStatus struct {
@ -65,6 +81,16 @@ type ResourceStatus struct {
GeneratedResources ResourceStatuses
}
// String returns a string suitable for logging
func (rs ResourceStatus) String() string {
if rs.Error != nil {
return fmt.Sprintf("ResourceStatus{ Identifier: %q, Status: %q, Message: %q, Resource: %v, GeneratedResources: %v, Error: %q }",
rs.Identifier, rs.Status, rs.Message, rs.Resource, rs.GeneratedResources, rs.Error)
}
return fmt.Sprintf("ResourceStatus{ Identifier: %q, Status: %q, Message: %q, Resource: %v, GeneratedResources: %v }",
rs.Identifier, rs.Status, rs.Message, rs.Resource, rs.GeneratedResources)
}
type ResourceStatuses []*ResourceStatus
func (g ResourceStatuses) Len() int {

View File

@ -1,4 +1,4 @@
// Code generated by "stringer -type=Type"; DO NOT EDIT.
// Code generated by "stringer -type=Type -linecomment"; DO NOT EDIT.
package event
@ -10,11 +10,12 @@ func _() {
var x [1]struct{}
_ = x[ResourceUpdateEvent-0]
_ = x[ErrorEvent-1]
_ = x[SyncEvent-2]
}
const _Type_name = "ResourceUpdateEventErrorEvent"
const _Type_name = "UpdateErrorSync"
var _Type_index = [...]uint8{0, 19, 29}
var _Type_index = [...]uint8{0, 6, 11, 15}
func (i Type) String() string {
if i < 0 || i >= Type(len(_Type_index)-1) {

View File

@ -0,0 +1,78 @@
// Copyright 2021 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package statusreaders
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)
// NewDefaultStatusReader returns a DelegatingStatusReader that wraps a list of
// statusreaders to cover all built-in Kubernetes resources and other CRDs that
// follow known status conventions.
func NewDefaultStatusReader(mapper meta.RESTMapper) engine.StatusReader {
defaultStatusReader := NewGenericStatusReader(mapper, status.Compute)
replicaSetStatusReader := NewReplicaSetStatusReader(mapper, defaultStatusReader)
deploymentStatusReader := NewDeploymentResourceReader(mapper, replicaSetStatusReader)
statefulSetStatusReader := NewStatefulSetResourceReader(mapper, defaultStatusReader)
return &DelegatingStatusReader{
StatusReaders: []engine.StatusReader{
deploymentStatusReader,
statefulSetStatusReader,
replicaSetStatusReader,
defaultStatusReader,
},
}
}
type DelegatingStatusReader struct {
StatusReaders []engine.StatusReader
}
func (dsr *DelegatingStatusReader) Supports(gk schema.GroupKind) bool {
for _, sr := range dsr.StatusReaders {
if sr.Supports(gk) {
return true
}
}
return false
}
func (dsr *DelegatingStatusReader) ReadStatus(
ctx context.Context,
reader engine.ClusterReader,
id object.ObjMetadata,
) (*event.ResourceStatus, error) {
gk := id.GroupKind
for _, sr := range dsr.StatusReaders {
if sr.Supports(gk) {
return sr.ReadStatus(ctx, reader, id)
}
}
return nil, fmt.Errorf("no status reader supports this resource: %v", gk)
}
func (dsr *DelegatingStatusReader) ReadStatusForObject(
ctx context.Context,
reader engine.ClusterReader,
obj *unstructured.Unstructured,
) (*event.ResourceStatus, error) {
gk := obj.GroupVersionKind().GroupKind()
for _, sr := range dsr.StatusReaders {
if sr.Supports(gk) {
return sr.ReadStatusForObject(ctx, reader, obj)
}
}
return nil, fmt.Errorf("no status reader supports this resource: %v", gk)
}

View File

@ -49,7 +49,7 @@ const (
// How long a pod can be unscheduled before it is reported as
// unschedulable.
scheduleWindow = 15 * time.Second
ScheduleWindow = 15 * time.Second
)
// GetLegacyConditionsFn returns a function that can compute the status for the
@ -208,7 +208,7 @@ func deploymentConditions(u *unstructured.Unstructured) (*Result, error) {
// TODO spec.replicas zero case ??
if specReplicas > statusReplicas {
message := fmt.Sprintf("replicas: %d/%d", statusReplicas, specReplicas)
message := fmt.Sprintf("Replicas: %d/%d", statusReplicas, specReplicas)
return newInProgressStatus(tooFewReplicas, message), nil
}
@ -448,7 +448,7 @@ func podConditions(u *unstructured.Unstructured) (*Result, error) {
case "Pending":
c, found := getConditionWithStatus(objc.Status.Conditions, "PodScheduled", corev1.ConditionFalse)
if found && c.Reason == "Unschedulable" {
if time.Now().Add(-scheduleWindow).Before(u.GetCreationTimestamp().Time) {
if time.Now().Add(-ScheduleWindow).Before(u.GetCreationTimestamp().Time) {
// We give the pod 15 seconds to be scheduled before we report it
// as unschedulable.
return newInProgressStatus("PodNotScheduled", "Pod has not been scheduled"), nil

View File

@ -0,0 +1,34 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/object"
)
// BlindStatusWatcher sees nothing.
// BlindStatusWatcher sends no update or error events.
// BlindStatusWatcher waits patiently to be cancelled.
// BlindStatusWatcher implements the StatusWatcher interface.
type BlindStatusWatcher struct{}
var _ StatusWatcher = BlindStatusWatcher{}
// Watch nothing. See no changes.
func (w BlindStatusWatcher) Watch(ctx context.Context, _ object.ObjMetadataSet, _ Options) <-chan event.Event {
doneCh := ctx.Done()
eventCh := make(chan event.Event)
go func() {
// Send SyncEvent immediately.
eventCh <- event.Event{Type: event.SyncEvent}
// Block until the context is cancelled.
<-doneCh
// Signal to the caller there will be no more events.
close(eventCh)
}()
return eventCh
}

View File

@ -0,0 +1,176 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders"
"sigs.k8s.io/cli-utils/pkg/object"
)
// DefaultStatusWatcher reports on status updates to a set of objects.
//
// Use NewDefaultStatusWatcher to build a DefaultStatusWatcher with default settings.
type DefaultStatusWatcher struct {
// DynamicClient is used to watch of resource objects.
DynamicClient dynamic.Interface
// Mapper is used to map from GroupKind to GroupVersionKind.
Mapper meta.RESTMapper
// ResyncPeriod is how often the objects are retrieved to re-synchronize,
// in case any events were missed.
ResyncPeriod time.Duration
// StatusReader specifies a custom implementation of the
// engine.StatusReader interface that will be used to compute reconcile
// status for resource objects.
StatusReader engine.StatusReader
// ClusterReader is used to look up generated objects on-demand.
// Generated objects (ex: Deployment > ReplicaSet > Pod) are sometimes
// required for computing parent object status, to compensate for
// controllers that aren't following status conventions.
ClusterReader engine.ClusterReader
}
var _ StatusWatcher = &DefaultStatusWatcher{}
// NewDefaultStatusWatcher constructs a DynamicStatusWatcher with defaults
// chosen for general use. If you need different settings, consider building a
// DynamicStatusWatcher directly.
func NewDefaultStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMapper) *DefaultStatusWatcher {
return &DefaultStatusWatcher{
DynamicClient: dynamicClient,
Mapper: mapper,
ResyncPeriod: 1 * time.Hour,
StatusReader: statusreaders.NewDefaultStatusReader(mapper),
ClusterReader: &clusterreader.DynamicClusterReader{
DynamicClient: dynamicClient,
Mapper: mapper,
},
}
}
// Watch the cluster for changes made to the specified objects.
// Returns an event channel on which these updates (and errors) will be reported.
// Each update event includes the computed status of the object.
func (w *DefaultStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadataSet, opts Options) <-chan event.Event {
strategy := opts.RESTScopeStrategy
if strategy == RESTScopeAutomatic {
strategy = autoSelectRESTScopeStrategy(ids)
}
var scope meta.RESTScope
var targets []GroupKindNamespace
switch strategy {
case RESTScopeRoot:
scope = meta.RESTScopeRoot
targets = rootScopeGKNs(ids)
klog.V(3).Infof("DynamicStatusWatcher starting in root-scoped mode (targets: %d)", len(targets))
case RESTScopeNamespace:
scope = meta.RESTScopeNamespace
targets = namespaceScopeGKNs(ids)
klog.V(3).Infof("DynamicStatusWatcher starting in namespace-scoped mode (targets: %d)", len(targets))
default:
return handleFatalError(fmt.Errorf("invalid RESTScopeStrategy: %v", strategy))
}
informer := &ObjectStatusReporter{
InformerFactory: NewDynamicInformerFactory(w.DynamicClient, w.ResyncPeriod),
Mapper: w.Mapper,
StatusReader: w.StatusReader,
ClusterReader: w.ClusterReader,
Targets: targets,
ObjectFilter: &AllowListObjectFilter{AllowList: ids},
RESTScope: scope,
}
return informer.Start(ctx)
}
func handleFatalError(err error) <-chan event.Event {
eventCh := make(chan event.Event)
go func() {
defer close(eventCh)
eventCh <- event.Event{
Type: event.ErrorEvent,
Error: err,
}
}()
return eventCh
}
func autoSelectRESTScopeStrategy(ids object.ObjMetadataSet) RESTScopeStrategy {
if len(uniqueNamespaces(ids)) > 1 {
return RESTScopeRoot
}
return RESTScopeNamespace
}
func rootScopeGKNs(ids object.ObjMetadataSet) []GroupKindNamespace {
gks := uniqueGKs(ids)
targets := make([]GroupKindNamespace, len(gks))
for i, gk := range gks {
targets[i] = GroupKindNamespace{
Group: gk.Group,
Kind: gk.Kind,
Namespace: "",
}
}
return targets
}
func namespaceScopeGKNs(ids object.ObjMetadataSet) []GroupKindNamespace {
return uniqueGKNs(ids)
}
// uniqueGKNs returns a set of unique GroupKindNamespaces from a set of object identifiers.
func uniqueGKNs(ids object.ObjMetadataSet) []GroupKindNamespace {
gknMap := make(map[GroupKindNamespace]struct{})
for _, id := range ids {
gkn := GroupKindNamespace{Group: id.GroupKind.Group, Kind: id.GroupKind.Kind, Namespace: id.Namespace}
gknMap[gkn] = struct{}{}
}
gknList := make([]GroupKindNamespace, 0, len(gknMap))
for gk := range gknMap {
gknList = append(gknList, gk)
}
return gknList
}
// uniqueGKs returns a set of unique GroupKinds from a set of object identifiers.
func uniqueGKs(ids object.ObjMetadataSet) []schema.GroupKind {
gkMap := make(map[schema.GroupKind]struct{})
for _, id := range ids {
gkn := schema.GroupKind{Group: id.GroupKind.Group, Kind: id.GroupKind.Kind}
gkMap[gkn] = struct{}{}
}
gkList := make([]schema.GroupKind, 0, len(gkMap))
for gk := range gkMap {
gkList = append(gkList, gk)
}
return gkList
}
func uniqueNamespaces(ids object.ObjMetadataSet) []string {
nsMap := make(map[string]struct{})
for _, id := range ids {
nsMap[id.Namespace] = struct{}{}
}
nsList := make([]string, 0, len(nsMap))
for ns := range nsMap {
nsList = append(nsList, ns)
}
return nsList
}

View File

@ -0,0 +1,890 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apimachinery/pkg/watch"
dynamicfake "k8s.io/client-go/dynamic/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/testutil"
)
var deployment1Yaml = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
generation: 1
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.19.6
ports:
- containerPort: 80
`
var deployment1InProgress1Yaml = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
generation: 1
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.19.6
ports:
- containerPort: 80
status:
observedGeneration: 1
updatedReplicas: 0
readyReplicas: 0
availableReplicas: 0
replicas: 0
conditions:
- reason: NewReplicaSetAvailable
status: "True"
type: Progressing
message: ReplicaSet "nginx-1" is progressing.
- reason: MinimumReplicasUnavailable
type: Available
status: "False"
message: Deployment does not have minimum availability.
`
var deployment1InProgress2Yaml = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
generation: 1
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.19.6
ports:
- containerPort: 80
status:
observedGeneration: 1
updatedReplicas: 1
readyReplicas: 0
availableReplicas: 0
replicas: 1
conditions:
- reason: NewReplicaSetAvailable
status: "True"
type: Progressing
message: ReplicaSet "nginx-1" is progressing.
- reason: MinimumReplicasUnavailable
type: Available
status: "False"
message: Deployment does not have minimum availability.
`
var deployment1CurrentYaml = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
generation: 1
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.19.6
ports:
- containerPort: 80
status:
observedGeneration: 1
updatedReplicas: 1
readyReplicas: 1
availableReplicas: 1
replicas: 1
conditions:
- reason: NewReplicaSetAvailable
status: "True"
type: Progressing
message: ReplicaSet "nginx-1" has successfully progressed.
- reason: MinimumReplicasAvailable
type: Available
status: "True"
message: Deployment has minimum availability.
`
var replicaset1Yaml = `
apiVersion: apps/v1
kind: ReplicaSet
metadata:
name: nginx-1
generation: 1
labels:
app: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
`
var replicaset1InProgress1Yaml = `
apiVersion: apps/v1
kind: ReplicaSet
metadata:
name: nginx-1
generation: 1
labels:
app: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
status:
observedGeneration: 1
replicas: 0
readyReplicas: 0
availableReplicas: 0
fullyLabeledReplicas: 0
`
var replicaset1InProgress2Yaml = `
apiVersion: apps/v1
kind: ReplicaSet
metadata:
name: nginx-1
generation: 1
labels:
app: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
status:
observedGeneration: 1
replicas: 1
readyReplicas: 0
availableReplicas: 0
fullyLabeledReplicas: 1
`
var replicaset1CurrentYaml = `
apiVersion: apps/v1
kind: ReplicaSet
metadata:
name: nginx-1
generation: 1
labels:
app: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
status:
observedGeneration: 1
replicas: 1
readyReplicas: 1
availableReplicas: 1
fullyLabeledReplicas: 1
`
var pod1Yaml = `
apiVersion: v1
kind: Pod
metadata:
generation: 1
name: test
labels:
app: nginx
`
var pod1CurrentYaml = `
apiVersion: v1
kind: Pod
metadata:
generation: 1
name: test
labels:
app: nginx
status:
conditions:
- type: Ready
status: "True"
phase: Running
`
func yamlToUnstructured(t *testing.T, yml string) *unstructured.Unstructured {
m := make(map[string]interface{})
err := yaml.Unmarshal([]byte(yml), &m)
if err != nil {
t.Fatalf("error parsing yaml: %v", err)
return nil
}
return &unstructured.Unstructured{Object: m}
}
func TestDefaultStatusWatcher(t *testing.T) {
deployment1 := yamlToUnstructured(t, deployment1Yaml)
deployment1ID := object.UnstructuredToObjMetadata(deployment1)
deployment1InProgress1 := yamlToUnstructured(t, deployment1InProgress1Yaml)
deployment1InProgress2 := yamlToUnstructured(t, deployment1InProgress2Yaml)
deployment1Current := yamlToUnstructured(t, deployment1CurrentYaml)
replicaset1 := yamlToUnstructured(t, replicaset1Yaml)
replicaset1ID := object.UnstructuredToObjMetadata(replicaset1)
replicaset1InProgress1 := yamlToUnstructured(t, replicaset1InProgress1Yaml)
replicaset1InProgress2 := yamlToUnstructured(t, replicaset1InProgress2Yaml)
replicaset1Current := yamlToUnstructured(t, replicaset1CurrentYaml)
pod1 := yamlToUnstructured(t, pod1Yaml)
pod1ID := object.UnstructuredToObjMetadata(pod1)
pod1Current := yamlToUnstructured(t, pod1CurrentYaml)
fakeMapper := testutil.NewFakeRESTMapper(
appsv1.SchemeGroupVersion.WithKind("Deployment"),
appsv1.SchemeGroupVersion.WithKind("ReplicaSet"),
v1.SchemeGroupVersion.WithKind("Pod"),
)
deployment1GVR := getGVR(t, fakeMapper, deployment1)
replicaset1GVR := getGVR(t, fakeMapper, replicaset1)
pod1GVR := getGVR(t, fakeMapper, pod1)
// namespace2 := "ns-2"
// namespace3 := "ns-3"
pod2 := pod1.DeepCopy()
pod2.SetNamespace("ns-2")
pod2.SetName("pod-2")
pod2ID := object.UnstructuredToObjMetadata(pod2)
pod2Current := yamlToUnstructured(t, pod1CurrentYaml)
pod2Current.SetNamespace("ns-2")
pod2Current.SetName("pod-2")
pod2GVR := getGVR(t, fakeMapper, pod2)
pod3 := pod1.DeepCopy()
pod3.SetNamespace("ns-3")
pod3.SetName("pod-3")
pod3ID := object.UnstructuredToObjMetadata(pod3)
pod3Current := yamlToUnstructured(t, pod1CurrentYaml)
pod3Current.SetNamespace("ns-3")
pod3Current.SetName("pod-3")
pod3GVR := getGVR(t, fakeMapper, pod3)
testCases := []struct {
name string
ids object.ObjMetadataSet
opts Options
clusterUpdates []func(*dynamicfake.FakeDynamicClient)
expectedEvents []event.Event
}{
{
name: "single-namespace pod creation",
ids: object.ObjMetadataSet{
pod1ID,
},
clusterUpdates: []func(fakeClient *dynamicfake.FakeDynamicClient){
func(fakeClient *dynamicfake.FakeDynamicClient) {
// Empty cluster before synchronization.
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(pod1GVR, pod1, pod1.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Update(pod1GVR, pod1Current, pod1Current.GetNamespace()))
},
},
expectedEvents: []event.Event{
{
Type: event.SyncEvent,
},
{
Resource: &event.ResourceStatus{
Identifier: pod1ID,
Status: status.InProgressStatus,
Resource: pod1,
Message: "Pod phase not available",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: pod1ID,
Status: status.CurrentStatus,
Resource: pod1Current,
Message: "Pod is Ready",
GeneratedResources: nil,
},
},
},
},
{
name: "single-namespace replicaset creation",
ids: object.ObjMetadataSet{
replicaset1ID,
},
clusterUpdates: []func(fakeClient *dynamicfake.FakeDynamicClient){
func(fakeClient *dynamicfake.FakeDynamicClient) {
// Empty cluster before synchronization.
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(replicaset1GVR, replicaset1, replicaset1.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Update(replicaset1GVR, replicaset1InProgress1, replicaset1InProgress1.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(pod1GVR, pod1, pod1.GetNamespace()))
require.NoError(t, fakeClient.Tracker().Update(replicaset1GVR, replicaset1InProgress2, replicaset1InProgress2.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Update(pod1GVR, pod1Current, pod1Current.GetNamespace()))
require.NoError(t, fakeClient.Tracker().Update(replicaset1GVR, replicaset1Current, replicaset1Current.GetNamespace()))
},
},
expectedEvents: []event.Event{
{
Type: event.SyncEvent,
},
{
Resource: &event.ResourceStatus{
Identifier: replicaset1ID,
Status: status.InProgressStatus,
Resource: replicaset1,
Message: "Labelled: 0/1",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: replicaset1ID,
Status: status.InProgressStatus,
Resource: replicaset1InProgress1,
Message: "Labelled: 0/1",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: replicaset1ID,
Status: status.InProgressStatus,
Resource: replicaset1InProgress2,
Message: "Available: 0/1",
GeneratedResources: event.ResourceStatuses{
{
Identifier: pod1ID,
Status: status.InProgressStatus,
Resource: pod1,
Message: "Pod phase not available",
GeneratedResources: nil,
},
},
},
},
{
Resource: &event.ResourceStatus{
Identifier: replicaset1ID,
Status: status.CurrentStatus,
Resource: replicaset1Current,
Message: "ReplicaSet is available. Replicas: 1",
GeneratedResources: event.ResourceStatuses{
{
Identifier: pod1ID,
Status: status.CurrentStatus,
Resource: pod1Current,
Message: "Pod is Ready",
GeneratedResources: nil,
},
},
},
},
},
},
{
name: "single-namespace deployment creation",
ids: object.ObjMetadataSet{
deployment1ID,
},
clusterUpdates: []func(fakeClient *dynamicfake.FakeDynamicClient){
func(fakeClient *dynamicfake.FakeDynamicClient) {
// Empty cluster before synchronization.
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(deployment1GVR, deployment1, deployment1.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(replicaset1GVR, replicaset1, replicaset1.GetNamespace()))
require.NoError(t, fakeClient.Tracker().Update(replicaset1GVR, replicaset1InProgress1, replicaset1InProgress1.GetNamespace()))
require.NoError(t, fakeClient.Tracker().Update(deployment1GVR, deployment1InProgress1, deployment1InProgress1.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(pod1GVR, pod1, pod1.GetNamespace()))
require.NoError(t, fakeClient.Tracker().Update(replicaset1GVR, replicaset1InProgress2, replicaset1InProgress2.GetNamespace()))
require.NoError(t, fakeClient.Tracker().Update(deployment1GVR, deployment1InProgress2, deployment1InProgress2.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Update(pod1GVR, pod1Current, pod1Current.GetNamespace()))
require.NoError(t, fakeClient.Tracker().Update(replicaset1GVR, replicaset1Current, replicaset1Current.GetNamespace()))
require.NoError(t, fakeClient.Tracker().Update(deployment1GVR, deployment1Current, deployment1Current.GetNamespace()))
},
},
expectedEvents: []event.Event{
{
Type: event.SyncEvent,
},
{
Resource: &event.ResourceStatus{
Identifier: deployment1ID,
Status: status.InProgressStatus,
Resource: deployment1,
Message: "Replicas: 0/1",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: deployment1ID,
Status: status.InProgressStatus,
Resource: deployment1InProgress1,
Message: "Replicas: 0/1",
GeneratedResources: event.ResourceStatuses{
{
Identifier: replicaset1ID,
Status: status.InProgressStatus,
Resource: replicaset1InProgress1,
Message: "Labelled: 0/1",
GeneratedResources: nil,
},
},
},
},
{
Resource: &event.ResourceStatus{
Identifier: deployment1ID,
Status: status.InProgressStatus,
Resource: deployment1InProgress2,
Message: "Available: 0/1",
GeneratedResources: event.ResourceStatuses{
{
Identifier: replicaset1ID,
Status: status.InProgressStatus,
Resource: replicaset1InProgress2,
Message: "Available: 0/1",
GeneratedResources: event.ResourceStatuses{
{
Identifier: pod1ID,
Status: status.InProgressStatus,
Resource: pod1,
Message: "Pod phase not available",
GeneratedResources: nil,
},
},
},
},
},
},
{
Resource: &event.ResourceStatus{
Identifier: deployment1ID,
Status: status.CurrentStatus,
Resource: deployment1Current,
Message: "Deployment is available. Replicas: 1",
GeneratedResources: event.ResourceStatuses{
{
Identifier: replicaset1ID,
Status: status.CurrentStatus,
Resource: replicaset1Current,
Message: "ReplicaSet is available. Replicas: 1",
GeneratedResources: event.ResourceStatuses{
{
Identifier: pod1ID,
Status: status.CurrentStatus,
Resource: pod1Current,
Message: "Pod is Ready",
GeneratedResources: nil,
},
},
},
},
},
},
},
},
{
name: "single-namespace deployment deletion",
ids: object.ObjMetadataSet{
deployment1ID,
},
clusterUpdates: []func(fakeClient *dynamicfake.FakeDynamicClient){
func(fakeClient *dynamicfake.FakeDynamicClient) {
// Empty cluster before synchronization.
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(pod1GVR, pod1Current, pod1Current.GetNamespace()))
require.NoError(t, fakeClient.Tracker().Create(replicaset1GVR, replicaset1Current, replicaset1Current.GetNamespace()))
require.NoError(t, fakeClient.Tracker().Create(deployment1GVR, deployment1Current, deployment1Current.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Delete(pod1GVR, pod1Current.GetNamespace(), pod1Current.GetName()))
require.NoError(t, fakeClient.Tracker().Delete(replicaset1GVR, replicaset1Current.GetNamespace(), replicaset1Current.GetName()))
require.NoError(t, fakeClient.Tracker().Delete(deployment1GVR, deployment1Current.GetNamespace(), deployment1Current.GetName()))
},
},
expectedEvents: []event.Event{
{
Type: event.SyncEvent,
},
{
Resource: &event.ResourceStatus{
Identifier: deployment1ID,
Status: status.CurrentStatus,
Resource: deployment1Current,
Message: "Deployment is available. Replicas: 1",
GeneratedResources: event.ResourceStatuses{
{
Identifier: replicaset1ID,
Status: status.CurrentStatus,
Resource: replicaset1Current,
Message: "ReplicaSet is available. Replicas: 1",
GeneratedResources: event.ResourceStatuses{
{
Identifier: pod1ID,
Status: status.CurrentStatus,
Resource: pod1Current,
Message: "Pod is Ready",
GeneratedResources: nil,
},
},
},
},
},
},
{
Resource: &event.ResourceStatus{
Identifier: deployment1ID,
Status: status.NotFoundStatus,
Resource: nil,
Message: "Resource not found",
GeneratedResources: nil,
},
},
},
},
{
name: "multi-namespace pod creation with automatic scope",
opts: Options{
RESTScopeStrategy: RESTScopeAutomatic,
},
ids: object.ObjMetadataSet{
pod2ID,
pod3ID,
},
clusterUpdates: []func(fakeClient *dynamicfake.FakeDynamicClient){
func(fakeClient *dynamicfake.FakeDynamicClient) {
// Empty cluster before synchronization.
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(pod2GVR, pod2, pod2.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(pod3GVR, pod3, pod3.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Update(pod2GVR, pod2Current, pod2Current.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Update(pod3GVR, pod3Current, pod3Current.GetNamespace()))
},
},
expectedEvents: []event.Event{
{
Type: event.SyncEvent,
},
{
Resource: &event.ResourceStatus{
Identifier: pod2ID,
Status: status.InProgressStatus,
Resource: pod2,
Message: "Pod phase not available",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: pod3ID,
Status: status.InProgressStatus,
Resource: pod3,
Message: "Pod phase not available",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: pod2ID,
Status: status.CurrentStatus,
Resource: pod2Current,
Message: "Pod is Ready",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: pod3ID,
Status: status.CurrentStatus,
Resource: pod3Current,
Message: "Pod is Ready",
GeneratedResources: nil,
},
},
},
},
{
name: "multi-namespace pod creation with root scope",
opts: Options{
RESTScopeStrategy: RESTScopeRoot,
},
ids: object.ObjMetadataSet{
pod2ID,
pod3ID,
},
clusterUpdates: []func(fakeClient *dynamicfake.FakeDynamicClient){
func(fakeClient *dynamicfake.FakeDynamicClient) {
// Empty cluster before synchronization.
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(pod2GVR, pod2, pod2.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(pod3GVR, pod3, pod3.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Update(pod2GVR, pod2Current, pod2Current.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Update(pod3GVR, pod3Current, pod3Current.GetNamespace()))
},
},
expectedEvents: []event.Event{
{
Type: event.SyncEvent,
},
{
Resource: &event.ResourceStatus{
Identifier: pod2ID,
Status: status.InProgressStatus,
Resource: pod2,
Message: "Pod phase not available",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: pod3ID,
Status: status.InProgressStatus,
Resource: pod3,
Message: "Pod phase not available",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: pod2ID,
Status: status.CurrentStatus,
Resource: pod2Current,
Message: "Pod is Ready",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: pod3ID,
Status: status.CurrentStatus,
Resource: pod3Current,
Message: "Pod is Ready",
GeneratedResources: nil,
},
},
},
},
{
name: "multi-namespace pod creation with namespace scope",
opts: Options{
RESTScopeStrategy: RESTScopeNamespace,
},
ids: object.ObjMetadataSet{
pod2ID,
pod3ID,
},
clusterUpdates: []func(fakeClient *dynamicfake.FakeDynamicClient){
func(fakeClient *dynamicfake.FakeDynamicClient) {
// Empty cluster before synchronization.
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(pod2GVR, pod2, pod2.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Create(pod3GVR, pod3, pod3.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Update(pod2GVR, pod2Current, pod2Current.GetNamespace()))
},
func(fakeClient *dynamicfake.FakeDynamicClient) {
require.NoError(t, fakeClient.Tracker().Update(pod3GVR, pod3Current, pod3Current.GetNamespace()))
},
},
expectedEvents: []event.Event{
{
Type: event.SyncEvent,
},
{
Resource: &event.ResourceStatus{
Identifier: pod2ID,
Status: status.InProgressStatus,
Resource: pod2,
Message: "Pod phase not available",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: pod3ID,
Status: status.InProgressStatus,
Resource: pod3,
Message: "Pod phase not available",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: pod2ID,
Status: status.CurrentStatus,
Resource: pod2Current,
Message: "Pod is Ready",
GeneratedResources: nil,
},
},
{
Resource: &event.ResourceStatus{
Identifier: pod3ID,
Status: status.CurrentStatus,
Resource: pod3Current,
Message: "Pod is Ready",
GeneratedResources: nil,
},
},
},
},
}
testTimeout := 10 * time.Second
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
// log fakeClient calls
fakeClient.PrependReactor("*", "*", func(a clienttesting.Action) (bool, runtime.Object, error) {
klog.V(3).Infof("FakeDynamicClient: %T{ Verb: %q, Resource: %q, Namespace: %q }",
a, a.GetVerb(), a.GetResource().Resource, a.GetNamespace())
return false, nil, nil
})
fakeClient.PrependWatchReactor("*", func(a clienttesting.Action) (bool, watch.Interface, error) {
klog.V(3).Infof("FakeDynamicClient: %T{ Verb: %q, Resource: %q, Namespace: %q }",
a, a.GetVerb(), a.GetResource().Resource, a.GetNamespace())
return false, nil, nil
})
statusWatcher := NewDefaultStatusWatcher(fakeClient, fakeMapper)
eventCh := statusWatcher.Watch(ctx, tc.ids, tc.opts)
nextCh := make(chan struct{})
defer close(nextCh)
// Synchronize event consumption and production for predictable test results.
go func() {
for _, update := range tc.clusterUpdates {
<-nextCh
update(fakeClient)
}
// Wait for final event to be handled
<-nextCh
// Stop the watcher
cancel()
}()
// Trigger first server update
nextCh <- struct{}{}
receivedEvents := []event.Event{}
for e := range eventCh {
receivedEvents = append(receivedEvents, e)
// Trigger next server update
nextCh <- struct{}{}
}
testutil.AssertEqual(t, tc.expectedEvents, receivedEvents)
})
}
}
func getGVR(t *testing.T, mapper meta.RESTMapper, obj *unstructured.Unstructured) schema.GroupVersionResource {
gvk := obj.GroupVersionKind()
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
require.NoError(t, err)
return mapping.Resource
}

View File

@ -0,0 +1,39 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Package watcher is a library for computing the status of kubernetes resource
// objects based on watching object state from a cluster. It keeps watching
// until it is cancelled through the provided context. Updates on the status of
// objects are streamed back to the caller through a channel.
//
// Watching Resources
//
// In order to watch a set of resources objects, create a StatusWatcher
// and pass in the list of object identifiers to the Watch function.
//
// import (
// "sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
// )
//
// ids := []prune.ObjMetadata{
// {
// GroupKind: schema.GroupKind{
// Group: "apps",
// Kind: "Deployment",
// },
// Name: "dep",
// Namespace: "default",
// }
// }
//
// statusWatcher := watcher.NewDefaultStatusWatcher(dynamicClient, mapper)
// ctx, cancelFunc := context.WithCancel(context.Background())
// eventCh := statusWatcher.Watch(ctx, ids, watcher.Options{})
// for e := range eventCh {
// // Handle event
// if e.Type == event.ErrorEvent {
// cancelFunc()
// return e.Err
// }
// }
package watcher

View File

@ -0,0 +1,57 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
)
type DynamicInformerFactory struct {
Client dynamic.Interface
ResyncPeriod time.Duration
Indexers cache.Indexers
}
func NewDynamicInformerFactory(client dynamic.Interface, resyncPeriod time.Duration) *DynamicInformerFactory {
return &DynamicInformerFactory{
Client: client,
ResyncPeriod: resyncPeriod,
Indexers: cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
},
}
}
func (f *DynamicInformerFactory) NewInformer(ctx context.Context, mapping *meta.RESTMapping, namespace string) cache.SharedIndexInformer {
// Unstructured example output need `"apiVersion"` and `"kind"` set.
example := &unstructured.Unstructured{}
example.SetGroupVersionKind(mapping.GroupVersionKind)
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return f.Client.Resource(mapping.Resource).
Namespace(namespace).
List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return f.Client.Resource(mapping.Resource).
Namespace(namespace).
Watch(ctx, options)
},
},
example,
f.ResyncPeriod,
f.Indexers,
)
}

View File

@ -0,0 +1,166 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/testapigroup"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/test"
"k8s.io/apimachinery/pkg/watch"
dynamicfake "k8s.io/client-go/dynamic/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/testutil"
)
func TestResourceNotFoundError(t *testing.T) {
carpGVK := schema.GroupVersionKind{
Group: "foo",
Version: "v1",
Kind: "Carp",
}
exampleGR := schema.GroupResource{
Group: carpGVK.Group,
Resource: "carps",
}
namespace := "example-ns"
testCases := []struct {
name string
setup func(*dynamicfake.FakeDynamicClient)
errorHandler func(t *testing.T, err error)
}{
{
name: "List resource not found error",
setup: func(fakeClient *dynamicfake.FakeDynamicClient) {
fakeClient.PrependReactor("list", exampleGR.Resource, func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
listAction := action.(clienttesting.ListAction)
if listAction.GetNamespace() != namespace {
assert.Fail(t, "Received unexpected LIST namespace: %s", listAction.GetNamespace())
return false, nil, nil
}
// dynamicClient converts Status objects from the apiserver into errors.
// So we can just return the right error here to simulate an error from
// the apiserver.
name := "" // unused by LIST requests
// The apisevrer confusingly does not return apierrors.NewNotFound,
// which has a nice constant for its error message.
// err = apierrors.NewNotFound(exampleGR, name)
// Instead it uses apierrors.NewGenericServerResponse, which uses
// a hard-coded error message.
err = apierrors.NewGenericServerResponse(http.StatusNotFound, "list", exampleGR, name, "unused", -1, false)
return true, nil, err
})
},
errorHandler: func(t *testing.T, err error) {
switch {
case apierrors.IsNotFound(err):
// If we got this error, something changed in the apiserver or
// client. If the client changed, it might be safe to stop parsing
// the error string.
t.Errorf("Expected untyped NotFound error, but got typed NotFound error: %v", err)
case containsNotFoundMessage(err):
// This is the expected hack, because the Informer/Reflector
// doesn't wrap the error with "%w".
t.Logf("Received expected untyped NotFound error: %v", err)
default:
// If we got this error, the test is probably broken.
t.Errorf("Expected untyped NotFound error, but got a different error: %v", err)
}
},
},
{
name: "Watch resource not found error",
setup: func(fakeClient *dynamicfake.FakeDynamicClient) {
fakeClient.PrependWatchReactor(exampleGR.Resource, func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
// dynamicClient converts Status objects from the apiserver into errors.
// So we can just return the right error here to simulate an error from
// the apiserver.
name := "" // unused by LIST requests
// The apisevrer confusingly does not return apierrors.NewNotFound,
// which has a nice constant for its error message.
// err = apierrors.NewNotFound(exampleGR, name)
// Instead it uses apierrors.NewGenericServerResponse, which uses
// a hard-coded error message.
err = apierrors.NewGenericServerResponse(http.StatusNotFound, "list", exampleGR, name, "unused", -1, false)
return true, nil, err
})
},
errorHandler: func(t *testing.T, err error) {
switch {
case apierrors.IsNotFound(err):
// This is the expected behavior, because the
// Informer/Reflector DOES wrap watch errors
t.Logf("Received expected untyped NotFound error: %v", err)
case containsNotFoundMessage(err):
// If this happens, there was a regression.
// Watch errors are expected to be wrapped with "%w"
t.Errorf("Expected typed NotFound error, but got untyped NotFound error: %v", err)
default:
// If we got this error, the test is probably broken.
t.Errorf("Expected untyped NotFound error, but got a different error: %v", err)
}
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
scheme := runtime.NewScheme()
scheme.AddKnownTypes(metav1.SchemeGroupVersion, &metav1.Status{})
// Register foo/v1 Carp CRD
scheme.AddKnownTypes(carpGVK.GroupVersion(), &testapigroup.Carp{}, &testapigroup.CarpList{}, &test.List{})
// Fake client that only knows about the types registered to the scheme
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme)
// log fakeClient calls
fakeClient.PrependReactor("*", "*", func(a clienttesting.Action) (bool, runtime.Object, error) {
klog.V(3).Infof("FakeDynamicClient: %T{ Verb: %q, Resource: %q, Namespace: %q }",
a, a.GetVerb(), a.GetResource().Resource, a.GetNamespace())
return false, nil, nil
})
fakeClient.PrependWatchReactor("*", func(a clienttesting.Action) (bool, watch.Interface, error) {
klog.V(3).Infof("FakeDynamicClient: %T{ Verb: %q, Resource: %q, Namespace: %q }",
a, a.GetVerb(), a.GetResource().Resource, a.GetNamespace())
return false, nil, nil
})
tc.setup(fakeClient)
informerFactory := NewDynamicInformerFactory(fakeClient, 0) // disable re-sync
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
fakeMapper := testutil.NewFakeRESTMapper(carpGVK)
mapping, err := fakeMapper.RESTMapping(carpGVK.GroupKind())
require.NoError(t, err)
informer := informerFactory.NewInformer(ctx, mapping, namespace)
err = informer.SetWatchErrorHandler(func(_ *cache.Reflector, err error) {
tc.errorHandler(t, err)
// Stop the informer after the first error.
cancel()
})
require.NoError(t, err)
// Block until context cancel or timeout.
informer.Run(ctx.Done())
})
}
}

View File

@ -0,0 +1,119 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"fmt"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
)
// eventFunnel wraps a list of event channels and multiplexes them down to a
// single event channel. New input channels can be added at runtime, and the
// output channel will remain open until all input channels are closed.
type eventFunnel struct {
// ctx closure triggers shutdown
ctx context.Context
// outCh is the funnel that consumes all events from input channels
outCh chan event.Event
// doneCh is closed after outCh is closed.
// This allows blocking until done without consuming events.
doneCh chan struct{}
// counterCh is used to track the number of open input channels.
counterCh chan int
}
func newEventFunnel(ctx context.Context) *eventFunnel {
funnel := &eventFunnel{
ctx: ctx,
outCh: make(chan event.Event),
doneCh: make(chan struct{}),
counterCh: make(chan int),
}
// Wait until the context is done and all input channels are closed.
// Then close out and done channels to signal completion.
go func() {
defer func() {
// Don't close counterCh, otherwise AddInputChannel may panic.
close(funnel.outCh)
close(funnel.doneCh)
}()
ctxDoneCh := ctx.Done()
// Count input channels that have been added and not closed.
inputs := 0
for {
select {
case delta := <-funnel.counterCh:
inputs += delta
case <-ctxDoneCh:
// Stop waiting for context closure.
// Nil channel avoids busy waiting.
ctxDoneCh = nil
}
if ctxDoneCh == nil && inputs <= 0 {
// Context is closed and all input channels are closed.
break
}
}
}()
return funnel
}
// Add a new input channel to the multiplexer.
func (m *eventFunnel) AddInputChannel(inCh <-chan event.Event) error {
select {
case <-m.ctx.Done(): // skip, if context is closed
return &EventFunnelClosedError{ContextError: m.ctx.Err()}
case m.counterCh <- 1: // increment counter
}
// Create a multiplexer for each new event channel.
go m.drain(inCh, m.outCh)
return nil
}
// OutputChannel channel receives all events sent to input channels.
// This channel is closed after all input channels are closed.
func (m *eventFunnel) OutputChannel() <-chan event.Event {
return m.outCh
}
// Done channel is closed after the Output channel is closed.
// This allows blocking until done without consuming events.
// If no input channels have been added yet, the done channel will be nil.
func (m *eventFunnel) Done() <-chan struct{} {
return m.doneCh
}
// drain a single input channel to a single output channel.
func (m *eventFunnel) drain(inCh <-chan event.Event, outCh chan<- event.Event) {
defer func() {
m.counterCh <- -1 // decrement counter
}()
for event := range inCh {
outCh <- event
}
}
type EventFunnelClosedError struct {
ContextError error
}
func (e *EventFunnelClosedError) Error() string {
return fmt.Sprintf("event funnel closed: %v", e.ContextError)
}
func (e *EventFunnelClosedError) Is(err error) bool {
fcErr, ok := err.(*EventFunnelClosedError)
if !ok {
return false
}
return e.ContextError == fcErr.ContextError
}
func (e *EventFunnelClosedError) Unwrap() error {
return e.ContextError
}

View File

@ -0,0 +1,22 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"os"
"testing"
"github.com/onsi/gomega/format"
"k8s.io/klog/v2"
)
// TestMain executes the tests for this package, with optional logging.
// To see all logs, use:
// go test sigs.k8s.io/cli-utils/pkg/kstatus/watcher -v -args -v=5
func TestMain(m *testing.M) {
// increase from 4000 to handle long event lists
format.MaxLength = 10000
klog.InitFlags(nil)
os.Exit(m.Run())
}

View File

@ -0,0 +1,30 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/cli-utils/pkg/object"
)
// ObjectFilter allows for filtering objects.
type ObjectFilter interface {
// Filter returns true if the object should be skipped.
Filter(obj *unstructured.Unstructured) bool
}
// AllowListObjectFilter filters objects not in the allow list.
// AllowListObjectFilter implements ObjectFilter.
type AllowListObjectFilter struct {
AllowList object.ObjMetadataSet
}
var _ ObjectFilter = &AllowListObjectFilter{}
// Filter returns true if the object should be skipped, because it is NOT in the
// AllowList.
func (f *AllowListObjectFilter) Filter(obj *unstructured.Unstructured) bool {
id := object.UnstructuredToObjMetadata(obj)
return !f.AllowList.Contains(id)
}

View File

@ -0,0 +1,887 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)
// GroupKindNamespace identifies an informer target.
// When used as an informer target, the namespace is optional.
// When the namespace is empty for namespaced resources, all namespaces are watched.
type GroupKindNamespace struct {
Group string
Kind string
Namespace string
}
// String returns a serialized form suitable for logging.
func (gkn GroupKindNamespace) String() string {
return fmt.Sprintf("%s/%s/namespaces/%s",
gkn.Group, gkn.Kind, gkn.Namespace)
}
func (gkn GroupKindNamespace) GroupKind() schema.GroupKind {
return schema.GroupKind{Group: gkn.Group, Kind: gkn.Kind}
}
// ObjectStatusReporter reports on updates to objects (instances) using a
// network of informers to watch one or more resources (types).
//
// Unlike SharedIndexInformer, ObjectStatusReporter...
// - Reports object status.
// - Can watch multiple resource types simultaneously.
// - Specific objects can be ignored for efficiency by specifying an ObjectFilter.
// - Resolves GroupKinds into Resources at runtime, to pick up newly added
// resources.
// - Starts and Stops individual watches automaically to reduce errors when a
// CRD or Namespace is deleted.
// - Resources can be watched in root-scope mode or namespace-scope mode,
// allowing the caller to optimize for efficiency or least-privilege.
// - Gives unschedulable Pods (and objects that generate them) a 15s grace
// period before reporting them as Failed.
// - Resets the RESTMapper cache automatically when CRDs are modified.
//
// ObjectStatusReporter is NOT repeatable. It will panic if started more than
// once. If you need a repeatable factory, use DefaultStatusWatcher.
//
// TODO: support detection of added/removed api extensions at runtime
// TODO: Watch CRDs & Namespaces, even if not in the set of IDs.
// TODO: Retry with backoff if in namespace-scoped mode, to allow CRDs & namespaces to be created asynchronously
type ObjectStatusReporter struct {
// InformerFactory is used to build informers
InformerFactory *DynamicInformerFactory
// Mapper is used to map from GroupKind to GroupVersionKind.
Mapper meta.RESTMapper
// StatusReader specifies a custom implementation of the
// engine.StatusReader interface that will be used to compute reconcile
// status for resource objects.
StatusReader engine.StatusReader
// ClusterReader is used to look up generated objects on-demand.
// Generated objects (ex: Deployment > ReplicaSet > Pod) are sometimes
// required for computing parent object status, to compensate for
// controllers that aren't following status conventions.
ClusterReader engine.ClusterReader
// GroupKinds is the list of GroupKinds to watch.
Targets []GroupKindNamespace
// ObjectFilter is used to decide which objects to ingore.
ObjectFilter ObjectFilter
// TODO: handle automatic?
RESTScope meta.RESTScope
// lock guards modification of the subsequent stateful fields
lock sync.Mutex
// gk2gkn maps GKs to GKNs to make it easy/cheap to look up.
gk2gkn map[schema.GroupKind]map[GroupKindNamespace]struct{}
// ns2gkn maps Namespaces to GKNs to make it easy/cheap to look up.
ns2gkn map[string]map[GroupKindNamespace]struct{}
// informerRefs tracks which informers have been started and stopped
informerRefs map[GroupKindNamespace]*informerReference
// context will be cancelled when the reporter should stop.
context context.Context
// cancel function that stops the context.
// This should only be called after the terminal error event has been sent.
cancel context.CancelFunc
// funnel multiplexes multiple input channels into one output channel,
// allowing input channels to be added and removed at runtime.
funnel *eventFunnel
// taskManager makes it possible to cancel scheduled tasks.
taskManager *taskManager
started bool
stopped bool
}
func (w *ObjectStatusReporter) Start(ctx context.Context) <-chan event.Event {
w.lock.Lock()
defer w.lock.Unlock()
if w.started {
panic("ObjectStatusInformer cannot be restarted")
}
w.taskManager = &taskManager{}
// Map GroupKinds to sets of GroupKindNamespaces for fast lookups.
// This is the only time we modify the map.
// So it should be safe to read from multiple threads after this.
w.gk2gkn = make(map[schema.GroupKind]map[GroupKindNamespace]struct{})
for _, gkn := range w.Targets {
gk := gkn.GroupKind()
m, found := w.gk2gkn[gk]
if !found {
m = make(map[GroupKindNamespace]struct{})
w.gk2gkn[gk] = m
}
m[gkn] = struct{}{}
}
// Map namespaces to sets of GroupKindNamespaces for fast lookups.
// This is the only time we modify the map.
// So it should be safe to read from multiple threads after this.
w.ns2gkn = make(map[string]map[GroupKindNamespace]struct{})
for _, gkn := range w.Targets {
ns := gkn.Namespace
m, found := w.ns2gkn[ns]
if !found {
m = make(map[GroupKindNamespace]struct{})
w.ns2gkn[ns] = m
}
m[gkn] = struct{}{}
}
// Initialize the informer map with references to track their start/stop.
// This is the only time we modify the map.
// So it should be safe to read from multiple threads after this.
w.informerRefs = make(map[GroupKindNamespace]*informerReference, len(w.Targets))
for _, gkn := range w.Targets {
w.informerRefs[gkn] = &informerReference{}
}
ctx, cancel := context.WithCancel(ctx)
w.context = ctx
w.cancel = cancel
// Use an event funnel to multiplex events through multiple input channels
// into out output channel. We can't use the normal fan-in pattern, because
// we need to be able to add and remove new input channels at runtime, as
// new informers are created and destroyed.
w.funnel = newEventFunnel(ctx)
// Send start requests.
for _, gkn := range w.Targets {
w.startInformer(gkn)
}
w.started = true
// Block until the event funnel is closed.
// The event funnel will close after all the informer channels are closed.
// The informer channels will close after the informers have stopped.
// The informers will stop after their context is cancelled.
go func() {
<-w.funnel.Done()
w.lock.Lock()
defer w.lock.Unlock()
w.stopped = true
}()
// Wait until all informers are synced or stopped, then send a SyncEvent.
syncEventCh := make(chan event.Event)
err := w.funnel.AddInputChannel(syncEventCh)
if err != nil {
// Reporter already stopped.
return handleFatalError(fmt.Errorf("reporter failed to start: %v", err))
}
go func() {
defer close(syncEventCh)
// TODO: should we use something less aggressive, like wait.BackoffUntil?
if cache.WaitForCacheSync(ctx.Done(), w.HasSynced) {
syncEventCh <- event.Event{
Type: event.SyncEvent,
}
}
}()
return w.funnel.OutputChannel()
}
// HasSynced returns true if all the started informers have been synced.
//
// TODO: provide a callback function, channel, or event to avoid needing to block with a rety loop.
//
// Use the following to block waiting for synchronization:
// synced := cache.WaitForCacheSync(stopCh, informer.HasSynced)
func (w *ObjectStatusReporter) HasSynced() bool {
w.lock.Lock()
defer w.lock.Unlock()
if w.stopped || !w.started {
return false
}
pending := make([]GroupKindNamespace, 0, len(w.informerRefs))
for gke, informer := range w.informerRefs {
if informer.HasStarted() && !informer.HasSynced() {
pending = append(pending, gke)
}
}
if len(pending) > 0 {
klog.V(5).Infof("Informers pending synchronization: %v", pending)
return false
}
return true
}
// startInformer adds the specified GroupKindNamespace to the start channel to
// be started asynchronously.
func (w *ObjectStatusReporter) startInformer(gkn GroupKindNamespace) {
ctx, ok := w.informerRefs[gkn].Start(w.context)
if !ok {
klog.V(5).Infof("Watch start skipped (already started): %v", gkn)
// already started
return
}
go w.startInformerWithRetry(ctx, gkn)
}
// stopInformer stops the informer watching the specified GroupKindNamespace.
func (w *ObjectStatusReporter) stopInformer(gkn GroupKindNamespace) {
w.informerRefs[gkn].Stop()
}
func (w *ObjectStatusReporter) startInformerWithRetry(ctx context.Context, gkn GroupKindNamespace) {
realClock := &clock.RealClock{}
backoffManager := wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock)
retryCtx, retryCancel := context.WithCancel(ctx)
wait.BackoffUntil(func() {
err := w.startInformerNow(
ctx,
gkn,
)
if err != nil {
if meta.IsNoMatchError(err) {
// CRD (or api extension) not installed
// TODO: retry if CRDs are not being watched
klog.V(3).Infof("Watch start error (blocking until CRD is added): %v: %v", gkn, err)
// Cancel the parent context, which will stop the retries too.
w.stopInformer(gkn)
return
}
// Create a temporary input channel to send the error event.
eventCh := make(chan event.Event)
defer close(eventCh)
err := w.funnel.AddInputChannel(eventCh)
if err != nil {
// Reporter already stopped.
// This is fine. 🔥
klog.V(5).Infof("Informer failed to start: %v", err)
return
}
// Send error event and stop the reporter!
w.handleFatalError(eventCh, err)
return
}
// Success! - Stop retrying
retryCancel()
}, backoffManager, true, retryCtx.Done())
}
// startInformerNow starts an informer to watch for changes to a
// GroupKindNamespace. Changes are filtered and passed by event channel into the
// funnel. Each update event includes the computed status of the object.
// An error is returned if the informer could not be created.
func (w *ObjectStatusReporter) startInformerNow(
ctx context.Context,
gkn GroupKindNamespace,
) error {
// Look up the mapping for this GroupKind.
// If it doesn't exist, either delay watching or emit an error.
gk := gkn.GroupKind()
mapping, err := w.Mapper.RESTMapping(gk)
if err != nil {
// Might be a NoResourceMatchError/NoKindMatchError
return err
}
informer := w.InformerFactory.NewInformer(ctx, mapping, gkn.Namespace)
// Handler called when ListAndWatch errors.
// Custom handler stops the informer if the resource is NotFound (CRD deleted).
err = informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
w.watchErrorHandler(gkn, err)
})
if err != nil {
// Should never happen.
// Informer can't have started yet. We just created it.
return fmt.Errorf("failed to set error handler on new informer for %v: %v", mapping.Resource, err)
}
w.informerRefs[gkn].SetInformer(informer)
eventCh := make(chan event.Event)
// Add this event channel to the output multiplexer
err = w.funnel.AddInputChannel(eventCh)
if err != nil {
// Reporter already stopped.
return fmt.Errorf("informer failed to build event handler: %w", err)
}
informer.AddEventHandler(w.eventHandler(ctx, eventCh))
// Start the informer in the background.
// Informer will be stopped when the context is cancelled.
go func() {
klog.V(3).Infof("Watch starting: %v", gkn)
informer.Run(ctx.Done())
klog.V(3).Infof("Watch stopped: %v", gkn)
// Signal to the caller there will be no more events for this GroupKind.
close(eventCh)
}()
return nil
}
func (w *ObjectStatusReporter) forEachTargetWithGroupKind(gk schema.GroupKind, fn func(GroupKindNamespace)) {
for gkn := range w.gk2gkn[gk] {
fn(gkn)
}
}
func (w *ObjectStatusReporter) forEachTargetWithNamespace(ns string, fn func(GroupKindNamespace)) {
for gkn := range w.ns2gkn[ns] {
fn(gkn)
}
}
// readStatusFromObject is a convenience function to read object status with a
// StatusReader using a ClusterReader to retrieve generated objects.
func (w *ObjectStatusReporter) readStatusFromObject(
ctx context.Context,
obj *unstructured.Unstructured,
) (*event.ResourceStatus, error) {
return w.StatusReader.ReadStatusForObject(ctx, w.ClusterReader, obj)
}
// readStatusFromCluster is a convenience function to read object status with a
// StatusReader using a ClusterReader to retrieve the object and its generated
// objects.
func (w *ObjectStatusReporter) readStatusFromCluster(
ctx context.Context,
id object.ObjMetadata,
) (*event.ResourceStatus, error) {
return w.StatusReader.ReadStatus(ctx, w.ClusterReader, id)
}
// deletedStatus builds a ResourceStatus for a deleted object.
//
// StatusReader.ReadStatusForObject doesn't handle nil objects as input. So
// this builds the status manually.
// TODO: find a way to delegate this back to the status package.
func deletedStatus(id object.ObjMetadata) *event.ResourceStatus {
// Status is always NotFound after deltion.
// Passed obj represents the last known state, not the current state.
result := &event.ResourceStatus{
Identifier: id,
Status: status.NotFoundStatus,
Message: "Resource not found",
}
return &event.ResourceStatus{
Identifier: id,
Resource: nil, // deleted object has no
Status: result.Status,
Message: result.Message,
// If deleted with foreground deletion, a finalizer will have blocked
// deletion until all the generated resources are deleted.
// TODO: Handle lookup of generated resources when not using foreground deletion.
GeneratedResources: nil,
}
}
// eventHandler builds an event handler to compute object status.
// Returns an event channel on which these stats updates will be reported.
func (w *ObjectStatusReporter) eventHandler(
ctx context.Context,
eventCh chan<- event.Event,
) cache.ResourceEventHandler {
var handler cache.ResourceEventHandlerFuncs
handler.AddFunc = func(iobj interface{}) {
// Bail early if the context is cancelled, to avoid unnecessary work.
if ctx.Err() != nil {
return
}
obj, ok := iobj.(*unstructured.Unstructured)
if !ok {
panic(fmt.Sprintf("AddFunc received unexpected object type %T", iobj))
}
id := object.UnstructuredToObjMetadata(obj)
if w.ObjectFilter.Filter(obj) {
klog.V(7).Infof("Watch Event Skipped: AddFunc: %s", id)
return
}
klog.V(5).Infof("AddFunc: Computing status for object: %s", id)
// cancel any scheduled status update for this object
w.taskManager.Cancel(id)
rs, err := w.readStatusFromObject(ctx, obj)
if err != nil {
// Send error event and stop the reporter!
w.handleFatalError(eventCh, fmt.Errorf("failed to compute object status: %s: %w", id, err))
return
}
if object.IsNamespace(obj) {
klog.V(5).Infof("AddFunc: Namespace added: %v", id)
w.onNamespaceAdd(obj)
} else if object.IsCRD(obj) {
klog.V(5).Infof("AddFunc: CRD added: %v", id)
w.onCRDAdd(obj)
}
if isObjectUnschedulable(rs) {
klog.V(5).Infof("AddFunc: object unschedulable: %v", id)
// schedule delayed status update
w.taskManager.Schedule(ctx, id, status.ScheduleWindow,
w.newStatusCheckTaskFunc(ctx, eventCh, id))
}
klog.V(7).Infof("AddFunc: sending update event: %v", rs)
eventCh <- event.Event{
Type: event.ResourceUpdateEvent,
Resource: rs,
}
}
handler.UpdateFunc = func(_, iobj interface{}) {
// Bail early if the context is cancelled, to avoid unnecessary work.
if ctx.Err() != nil {
return
}
obj, ok := iobj.(*unstructured.Unstructured)
if !ok {
panic(fmt.Sprintf("UpdateFunc received unexpected object type %T", iobj))
}
id := object.UnstructuredToObjMetadata(obj)
if w.ObjectFilter.Filter(obj) {
klog.V(7).Infof("UpdateFunc: Watch Event Skipped: %s", id)
return
}
klog.V(5).Infof("UpdateFunc: Computing status for object: %s", id)
// cancel any scheduled status update for this object
w.taskManager.Cancel(id)
rs, err := w.readStatusFromObject(ctx, obj)
if err != nil {
// Send error event and stop the reporter!
w.handleFatalError(eventCh, fmt.Errorf("failed to compute object status: %s: %w", id, err))
return
}
if object.IsNamespace(obj) {
klog.V(5).Infof("UpdateFunc: Namespace updated: %v", id)
w.onNamespaceUpdate(obj)
} else if object.IsCRD(obj) {
klog.V(5).Infof("UpdateFunc: CRD updated: %v", id)
w.onCRDUpdate(obj)
}
if isObjectUnschedulable(rs) {
klog.V(5).Infof("UpdateFunc: object unschedulable: %v", id)
// schedule delayed status update
w.taskManager.Schedule(ctx, id, status.ScheduleWindow,
w.newStatusCheckTaskFunc(ctx, eventCh, id))
}
klog.V(7).Infof("UpdateFunc: sending update event: %v", rs)
eventCh <- event.Event{
Type: event.ResourceUpdateEvent,
Resource: rs,
}
}
handler.DeleteFunc = func(iobj interface{}) {
// Bail early if the context is cancelled, to avoid unnecessary work.
if ctx.Err() != nil {
return
}
if tombstone, ok := iobj.(cache.DeletedFinalStateUnknown); ok {
// Last state unknown. Possibly stale.
// TODO: Should we propegate this uncertainty to the caller?
iobj = tombstone.Obj
}
obj, ok := iobj.(*unstructured.Unstructured)
if !ok {
panic(fmt.Sprintf("DeleteFunc received unexpected object type %T", iobj))
}
id := object.UnstructuredToObjMetadata(obj)
if w.ObjectFilter.Filter(obj) {
klog.V(7).Infof("DeleteFunc: Watch Event Skipped: %s", id)
return
}
klog.V(5).Infof("DeleteFunc: Computing status for object: %s", id)
// cancel any scheduled status update for this object
w.taskManager.Cancel(id)
if object.IsNamespace(obj) {
klog.V(5).Infof("DeleteFunc: Namespace deleted: %v", id)
w.onNamespaceDelete(obj)
} else if object.IsCRD(obj) {
klog.V(5).Infof("DeleteFunc: CRD deleted: %v", id)
w.onCRDDelete(obj)
}
rs := deletedStatus(id)
klog.V(7).Infof("DeleteFunc: sending update event: %v", rs)
eventCh <- event.Event{
Type: event.ResourceUpdateEvent,
Resource: rs,
}
}
return handler
}
// onCRDAdd handles creating a new informer to watch the new resource type.
func (w *ObjectStatusReporter) onCRDAdd(obj *unstructured.Unstructured) {
gk, found := object.GetCRDGroupKind(obj)
if !found {
id := object.UnstructuredToObjMetadata(obj)
klog.Warningf("Invalid CRD added: missing group and/or kind: %v", id)
// Don't return an error, because this should not inturrupt the task queue.
// TODO: Allow non-fatal errors to be reported using a specific error type.
return
}
klog.V(3).Infof("CRD added for %s", gk)
klog.V(3).Info("Resetting RESTMapper")
// Reset mapper to invalidate cache.
meta.MaybeResetRESTMapper(w.Mapper)
w.forEachTargetWithGroupKind(gk, func(gkn GroupKindNamespace) {
w.startInformer(gkn)
})
}
// onCRDUpdate handles creating a new informer to watch the updated resource type.
func (w *ObjectStatusReporter) onCRDUpdate(newObj *unstructured.Unstructured) {
gk, found := object.GetCRDGroupKind(newObj)
if !found {
id := object.UnstructuredToObjMetadata(newObj)
klog.Warningf("Invalid CRD updated: missing group and/or kind: %v", id)
// Don't return an error, because this should not inturrupt the task queue.
// TODO: Allow non-fatal errors to be reported using a specific error type.
return
}
klog.V(3).Infof("CRD updated for %s", gk)
klog.V(3).Info("Resetting RESTMapper")
// Reset mapper to invalidate cache.
meta.MaybeResetRESTMapper(w.Mapper)
w.forEachTargetWithGroupKind(gk, func(gkn GroupKindNamespace) {
w.startInformer(gkn)
})
}
// onCRDDelete handles stopping the informer watching the deleted resource type.
func (w *ObjectStatusReporter) onCRDDelete(oldObj *unstructured.Unstructured) {
gk, found := object.GetCRDGroupKind(oldObj)
if !found {
id := object.UnstructuredToObjMetadata(oldObj)
klog.Warningf("Invalid CRD deleted: missing group and/or kind: %v", id)
// Don't return an error, because this should not inturrupt the task queue.
// TODO: Allow non-fatal errors to be reported using a specific error type.
return
}
klog.V(3).Infof("CRD deleted for %s", gk)
w.forEachTargetWithGroupKind(gk, func(gkn GroupKindNamespace) {
w.stopInformer(gkn)
})
klog.V(3).Info("Resetting RESTMapper")
// Reset mapper to invalidate cache.
meta.MaybeResetRESTMapper(w.Mapper)
}
// onNamespaceAdd handles creating new informers to watch this namespace.
func (w *ObjectStatusReporter) onNamespaceAdd(obj *unstructured.Unstructured) {
if w.RESTScope == meta.RESTScopeRoot {
// When watching resources across all namespaces,
// we don't need to start or stop any
// namespace-specific informers.
return
}
namespace := obj.GetName()
w.forEachTargetWithNamespace(namespace, func(gkn GroupKindNamespace) {
w.startInformer(gkn)
})
}
// onNamespaceUpdate handles creating new informers to watch this namespace.
func (w *ObjectStatusReporter) onNamespaceUpdate(obj *unstructured.Unstructured) {
if w.RESTScope == meta.RESTScopeRoot {
// When watching resources across all namespaces,
// we don't need to start or stop any
// namespace-specific informers.
return
}
namespace := obj.GetName()
w.forEachTargetWithNamespace(namespace, func(gkn GroupKindNamespace) {
w.startInformer(gkn)
})
}
// onNamespaceDelete handles stopping informers watching this namespace.
func (w *ObjectStatusReporter) onNamespaceDelete(obj *unstructured.Unstructured) {
if w.RESTScope == meta.RESTScopeRoot {
// When watching resources across all namespaces,
// we don't need to start or stop any
// namespace-specific informers.
return
}
namespace := obj.GetName()
w.forEachTargetWithNamespace(namespace, func(gkn GroupKindNamespace) {
w.stopInformer(gkn)
})
}
// newStatusCheckTaskFunc returns a taskFund that reads the status of an object
// from the cluster and sends it over the event channel.
//
// This method should only be used for generated resource objects, as it's much
// slower at scale than watching the resource for updates.
func (w *ObjectStatusReporter) newStatusCheckTaskFunc(
ctx context.Context,
eventCh chan<- event.Event,
id object.ObjMetadata,
) taskFunc {
return func() {
klog.V(5).Infof("Re-reading object status: %s", status.ScheduleWindow, id)
// check again
rs, err := w.readStatusFromCluster(ctx, id)
if err != nil {
// Send error event and stop the reporter!
// TODO: retry N times before terminating
w.handleFatalError(eventCh, err)
return
}
eventCh <- event.Event{
Type: event.ResourceUpdateEvent,
Resource: rs,
}
}
}
func (w *ObjectStatusReporter) handleFatalError(eventCh chan<- event.Event, err error) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
klog.V(5).Infof("Watch closed: %v", err)
return
}
eventCh <- event.Event{
Type: event.ErrorEvent,
Error: err,
}
// Terminate the reporter.
w.cancel()
}
// watchErrorHandler logs errors and cancels the informer for this GroupKind
// if the NotFound error is received, which usually means the CRD was deleted.
// Based on DefaultWatchErrorHandler from k8s.io/client-go@v0.23.2/tools/cache/reflector.go
func (w *ObjectStatusReporter) watchErrorHandler(gkn GroupKindNamespace, err error) {
// Note: Informers use a stop channel, not a Context, so we don't expect
// Canceled or DeadlineExceeded here.
switch {
case err == io.EOF:
// Stop channel closed
klog.V(5).Infof("Watch closed: %v: %v", gkn, err)
case err == io.ErrUnexpectedEOF:
// Keep retrying
klog.V(1).Infof("Watch closed: %v: %v", gkn, err)
case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
// Context cancelled
klog.V(5).Infof("Watch closed: %v: %v", gkn, err)
case apierrors.IsResourceExpired(err): // resourceVersion too old
// Keep retrying
klog.V(5).Infof("Watch closed: %v: %v", gkn, err)
case apierrors.IsGone(err): // DEPRECATED
// Keep retrying
klog.V(5).Infof("Watch closed: %v: %v", gkn, err)
case apierrors.IsNotFound(err) || containsNotFoundMessage(err): // CRD deleted or not created
// Stop watching this resource
klog.V(3).Infof("Watch error: %v: stopping all watchers for this GroupKind: %v", gkn, err)
// Stop all informers for this GK
w.forEachTargetWithGroupKind(gkn.GroupKind(), func(gkn GroupKindNamespace) {
w.stopInformer(gkn)
})
default:
// Keep retrying
klog.Warningf("Watch error (will retry): %v: %v", gkn, err)
}
}
// resourceNotFoundMessage is the condition message for metav1.StatusReasonNotFound.
const resourceNotFoundMessage = "the server could not find the requested resource"
// containsNotFoundMessage checks if the error string contains the message for
// StatusReasonNotFound.
// See k8s.io/apimachinery@v0.23.2/pkg/api/errors/errors.go
// This is necessary because the Informer doesn't properly wrap errors.
func containsNotFoundMessage(err error) bool {
return strings.Contains(err.Error(), resourceNotFoundMessage)
}
// informerReference tracks informer lifecycle.
type informerReference struct {
// lock guards the subsequent stateful fields
lock sync.Mutex
informer cache.SharedIndexInformer
context context.Context
cancel context.CancelFunc
started bool
}
// Start returns a wrapped context that can be cancelled.
// Returns nil & false if already started.
func (ir *informerReference) Start(ctx context.Context) (context.Context, bool) {
ir.lock.Lock()
defer ir.lock.Unlock()
if ir.started {
return nil, false
}
ctx, cancel := context.WithCancel(ctx)
ir.context = ctx
ir.cancel = cancel
ir.started = true
return ctx, true
}
func (ir *informerReference) SetInformer(informer cache.SharedIndexInformer) {
ir.lock.Lock()
defer ir.lock.Unlock()
ir.informer = informer
}
func (ir *informerReference) HasSynced() bool {
ir.lock.Lock()
defer ir.lock.Unlock()
if !ir.started {
return false
}
if ir.informer == nil {
return false
}
return ir.informer.HasSynced()
}
func (ir *informerReference) HasStarted() bool {
ir.lock.Lock()
defer ir.lock.Unlock()
return ir.started
}
// Stop cancels the context, if it's been started.
func (ir *informerReference) Stop() {
ir.lock.Lock()
defer ir.lock.Unlock()
if !ir.started {
return
}
ir.cancel()
ir.started = false
ir.context = nil
}
type taskFunc func()
// taskManager manages a set of tasks with object identifiers.
// This makes starting and stopping the tasks thread-safe.
type taskManager struct {
lock sync.Mutex
cancelFuncs map[object.ObjMetadata]context.CancelFunc
}
func (tm *taskManager) Schedule(parentCtx context.Context, id object.ObjMetadata, delay time.Duration, task taskFunc) {
tm.lock.Lock()
defer tm.lock.Unlock()
if tm.cancelFuncs == nil {
tm.cancelFuncs = make(map[object.ObjMetadata]context.CancelFunc)
}
cancel, found := tm.cancelFuncs[id]
if found {
// Cancel the existing scheduled task and replace it.
cancel()
}
taskCtx, cancel := context.WithTimeout(context.Background(), delay)
tm.cancelFuncs[id] = cancel
go func() {
klog.V(5).Infof("Task scheduled (%v) for object (%s)", delay, id)
select {
case <-parentCtx.Done():
// stop waiting
cancel()
case <-taskCtx.Done():
if taskCtx.Err() == context.DeadlineExceeded {
klog.V(5).Infof("Task executing (after %v) for object (%v)", delay, id)
task()
}
// else stop waiting
}
}()
}
func (tm *taskManager) Cancel(id object.ObjMetadata) {
tm.lock.Lock()
defer tm.lock.Unlock()
cancelFunc, found := tm.cancelFuncs[id]
if !found {
// already cancelled or not added
return
}
delete(tm.cancelFuncs, id)
cancelFunc()
if len(tm.cancelFuncs) == 0 {
tm.cancelFuncs = nil
}
}

View File

@ -0,0 +1,25 @@
// Code generated by "stringer -type=RESTScopeStrategy -linecomment"; DO NOT EDIT.
package watcher
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[RESTScopeAutomatic-0]
_ = x[RESTScopeRoot-1]
_ = x[RESTScopeNamespace-2]
}
const _RESTScopeStrategy_name = "automaticrootnamespace"
var _RESTScopeStrategy_index = [...]uint8{0, 9, 13, 22}
func (i RESTScopeStrategy) String() string {
if i < 0 || i >= RESTScopeStrategy(len(_RESTScopeStrategy_index)-1) {
return "RESTScopeStrategy(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _RESTScopeStrategy_name[_RESTScopeStrategy_index[i]:_RESTScopeStrategy_index[i+1]]
}

View File

@ -0,0 +1,69 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)
// isObjectUnschedulable returns true if the object or any of its generated resources
// is an unschedulable pod.
//
// This status is computed recursively, so it can handle objects that generate
// objects that generate pods, as long as the input ResourceStatus has those
// GeneratedResources computed.
func isObjectUnschedulable(rs *event.ResourceStatus) bool {
if rs.Error != nil {
return false
}
if rs.Status != status.InProgressStatus {
return false
}
if isPodUnschedulable(rs.Resource) {
return true
}
// recurse through generated resources
for _, subRS := range rs.GeneratedResources {
if isObjectUnschedulable(subRS) {
return true
}
}
return false
}
// isPodUnschedulable returns true if the object is a pod and is unschedulable
// according to a False PodScheduled condition.
func isPodUnschedulable(obj *unstructured.Unstructured) bool {
if obj == nil {
return false
}
gk := obj.GroupVersionKind().GroupKind()
if gk != (schema.GroupKind{Kind: "Pod"}) {
return false
}
icnds, found, err := object.NestedField(obj.Object, "status", "conditions")
if err != nil || !found {
return false
}
cnds, ok := icnds.([]interface{})
if !ok {
return false
}
for _, icnd := range cnds {
cnd, ok := icnd.(map[string]interface{})
if !ok {
return false
}
if cnd["type"] == "PodScheduled" &&
cnd["status"] == "False" &&
cnd["reason"] == "Unschedulable" {
return true
}
}
return false
}

View File

@ -0,0 +1,38 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/object"
)
// StatusWatcher watches a set of objects for status updates.
type StatusWatcher interface {
// Watch a set of objects for status updates.
// Watching should stop if the context is cancelled.
// Events should only be sent for the specified objects.
// The event channel should be closed when the watching stops.
Watch(context.Context, object.ObjMetadataSet, Options) <-chan event.Event
}
// Options can be provided when creating a new StatusWatcher to customize the
// behavior.
type Options struct {
// RESTScopeStrategy specifies which strategy to use when listing and
// watching resources. By default, the strategy is selected automatically.
RESTScopeStrategy RESTScopeStrategy
}
//go:generate stringer -type=RESTScopeStrategy -linecomment
type RESTScopeStrategy int
const (
RESTScopeAutomatic RESTScopeStrategy = iota // automatic
RESTScopeRoot // root
RESTScopeNamespace // namespace
)

View File

@ -56,6 +56,17 @@ func IsNamespaced(u *unstructured.Unstructured) bool {
return u.GetNamespace() != ""
}
// IsNamespace returns true if the passed Unstructured object
// is Namespace in the core (empty string) group.
func IsNamespace(u *unstructured.Unstructured) bool {
if u == nil {
return false
}
gvk := u.GroupVersionKind()
// core group, any version
return gvk.Group == "" && gvk.Kind == "Namespace"
}
// IsCRD returns true if the passed Unstructured object has
// GroupKind == Extensions/CustomResourceDefinition; false otherwise.
func IsCRD(u *unstructured.Unstructured) bool {

View File

@ -146,9 +146,9 @@ func applyAndDestroyTest(ctx context.Context, c client.Client, invConfig invconf
},
},
}
received := testutil.EventsToExpEvents(applierEvents)
receivedEvents := testutil.EventsToExpEvents(applierEvents)
// handle required async NotFound StatusEvents
// handle optional async NotFound StatusEvents
expected := testutil.ExpEvent{
EventType: event.StatusType,
StatusEvent: &testutil.ExpStatusEvent{
@ -157,8 +157,7 @@ func applyAndDestroyTest(ctx context.Context, c client.Client, invConfig invconf
Error: nil,
},
}
received, matches := testutil.RemoveEqualEvents(received, expected)
Expect(matches).To(BeNumerically(">=", 1), "unexpected number of %q status events", status.NotFoundStatus)
receivedEvents, _ = testutil.RemoveEqualEvents(receivedEvents, expected)
// handle optional async InProgress StatusEvents
expected = testutil.ExpEvent{
@ -169,7 +168,7 @@ func applyAndDestroyTest(ctx context.Context, c client.Client, invConfig invconf
Error: nil,
},
}
received, _ = testutil.RemoveEqualEvents(received, expected)
receivedEvents, _ = testutil.RemoveEqualEvents(receivedEvents, expected)
// handle required async Current StatusEvents
expected = testutil.ExpEvent{
@ -180,10 +179,12 @@ func applyAndDestroyTest(ctx context.Context, c client.Client, invConfig invconf
Error: nil,
},
}
received, matches = testutil.RemoveEqualEvents(received, expected)
receivedEvents, matches := testutil.RemoveEqualEvents(receivedEvents, expected)
Expect(matches).To(BeNumerically(">=", 1), "unexpected number of %q status events", status.CurrentStatus)
Expect(received).To(testutil.Equal(expEvents))
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("Verify deployment created")
e2eutil.AssertUnstructuredExists(ctx, c, deployment1Obj)
@ -286,8 +287,11 @@ func applyAndDestroyTest(ctx context.Context, c client.Client, invConfig invconf
},
},
}
receivedEvents = testutil.EventsToExpEvents(destroyerEvents)
Expect(testutil.EventsToExpEvents(destroyerEvents)).To(testutil.Equal(expEvents))
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("Verify deployment deleted")
e2eutil.AssertUnstructuredDoesNotExist(ctx, c, deployment1Obj)

View File

@ -191,8 +191,12 @@ func continueOnErrorTest(ctx context.Context, c client.Client, invConfig invconf
},
}
receivedEvents := testutil.EventsToExpEvents(applierEvents)
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
// sort to allow comparison of multiple ApplyTasks in the same task group
testutil.SortExpEvents(receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("Verify pod1 created")

View File

@ -103,7 +103,7 @@ func crdTest(ctx context.Context, _ client.Client, invConfig invconfig.Inventory
},
},
{
// CRD reconcile Pending .
// CRD reconcile Pending
EventType: event.WaitType,
WaitEvent: &testutil.ExpWaitEvent{
GroupName: "wait-0",
@ -167,7 +167,7 @@ func crdTest(ctx context.Context, _ client.Client, invConfig invconfig.Inventory
},
},
{
// CR reconcile Pending .
// CR reconcile Pending
EventType: event.WaitType,
WaitEvent: &testutil.ExpWaitEvent{
GroupName: "wait-1",
@ -212,7 +212,11 @@ func crdTest(ctx context.Context, _ client.Client, invConfig invconfig.Inventory
},
},
}
Expect(testutil.EventsToExpEvents(applierEvents)).To(testutil.Equal(expEvents))
receivedEvents := testutil.EventsToExpEvents(applierEvents)
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("destroy the resources, including the crd")
destroyer := invConfig.DestroyerFactoryFunc()
@ -263,7 +267,7 @@ func crdTest(ctx context.Context, _ client.Client, invConfig invconfig.Inventory
},
},
{
// CR reconcile Pending.
// CR reconcile Pending
EventType: event.WaitType,
WaitEvent: &testutil.ExpWaitEvent{
GroupName: "wait-0",
@ -372,7 +376,11 @@ func crdTest(ctx context.Context, _ client.Client, invConfig invconfig.Inventory
},
},
}
Expect(testutil.EventsToExpEvents(destroyerEvents)).To(testutil.Equal(expEvents))
receivedEvents = testutil.EventsToExpEvents(destroyerEvents)
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
}
var crd = []byte(strings.TrimSpace(`

View File

@ -220,7 +220,11 @@ func dependencyFilterTest(ctx context.Context, c client.Client, invConfig invcon
},
},
}
Expect(testutil.EventsToExpEvents(applierEvents)).To(testutil.Equal(expEvents))
receivedEvents := testutil.EventsToExpEvents(applierEvents)
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("verify pod1 created and ready")
result := e2eutil.AssertUnstructuredExists(ctx, c, pod1Obj)
@ -411,7 +415,11 @@ func dependencyFilterTest(ctx context.Context, c client.Client, invConfig invcon
},
},
}
Expect(testutil.EventsToExpEvents(applierEvents)).To(testutil.Equal(expEvents))
receivedEvents = testutil.EventsToExpEvents(applierEvents)
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("verify pod1 not deleted")
result = e2eutil.AssertUnstructuredExists(ctx, c, pod1Obj)

View File

@ -383,8 +383,12 @@ func dependsOnTest(ctx context.Context, c client.Client, invConfig invconfig.Inv
},
}
receivedEvents := testutil.EventsToExpEvents(applierEvents)
// sort to handle objects reconciling in random order
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
// sort to compensate for wait task reconcile ordering variations
testutil.SortExpEvents(receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("verify namespace1 created")
@ -627,7 +631,7 @@ func dependsOnTest(ctx context.Context, c client.Client, invConfig invconfig.Inv
},
},
{
// Delete Namespace2 last
// Delete Namespace1 last
EventType: event.DeleteType,
DeleteEvent: &testutil.ExpDeleteEvent{
GroupName: "prune-3",
@ -636,7 +640,7 @@ func dependsOnTest(ctx context.Context, c client.Client, invConfig invconfig.Inv
},
},
{
// Delete Namespace1 last
// Delete Namespace2 last
EventType: event.DeleteType,
DeleteEvent: &testutil.ExpDeleteEvent{
GroupName: "prune-3",
@ -727,8 +731,12 @@ func dependsOnTest(ctx context.Context, c client.Client, invConfig invconfig.Inv
},
}
receivedEvents = testutil.EventsToExpEvents(destroyerEvents)
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
// sort to handle objects reconciling in random order
testutil.SortExpEvents(receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("verify pod1 deleted")

View File

@ -149,9 +149,9 @@ func dryRunTest(ctx context.Context, c client.Client, invConfig invconfig.Invent
},
},
}
received := testutil.EventsToExpEvents(applierEvents)
receivedEvents := testutil.EventsToExpEvents(applierEvents)
// handle required async NotFound StatusEvent for pod
// handle optional async NotFound StatusEvent for pod
expected := testutil.ExpEvent{
EventType: event.StatusType,
StatusEvent: &testutil.ExpStatusEvent{
@ -160,10 +160,9 @@ func dryRunTest(ctx context.Context, c client.Client, invConfig invconfig.Invent
Error: nil,
},
}
received, matches := testutil.RemoveEqualEvents(received, expected)
Expect(matches).To(BeNumerically(">=", 1), "unexpected number of %q status events for namespace", status.NotFoundStatus)
receivedEvents, _ = testutil.RemoveEqualEvents(receivedEvents, expected)
// handle required async NotFound StatusEvent for namespace
// handle optional async NotFound StatusEvent for namespace
expected = testutil.ExpEvent{
EventType: event.StatusType,
StatusEvent: &testutil.ExpStatusEvent{
@ -172,10 +171,9 @@ func dryRunTest(ctx context.Context, c client.Client, invConfig invconfig.Invent
Error: nil,
},
}
received, matches = testutil.RemoveEqualEvents(received, expected)
Expect(matches).To(BeNumerically(">=", 1), "unexpected number of %q status events for pod", status.NotFoundStatus)
receivedEvents, _ = testutil.RemoveEqualEvents(receivedEvents, expected)
Expect(received).To(testutil.Equal(expEvents))
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("Verify pod NotFound")
e2eutil.AssertUnstructuredDoesNotExist(ctx, c, podBObj)

View File

@ -10,6 +10,8 @@ import (
"text/template"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
@ -26,6 +28,7 @@ import (
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object/dependson"
"sigs.k8s.io/cli-utils/pkg/object/mutation"
"sigs.k8s.io/cli-utils/pkg/testutil"
"sigs.k8s.io/cli-utils/test/e2e/customprovider"
"sigs.k8s.io/controller-runtime/pkg/client"
)
@ -422,3 +425,31 @@ func IsFlowControlEnabled(config *rest.Config) bool {
return enabled
}
// FilterOptionalEvents looks for optional events in the expected list and
// removes them from both lists. This allows the output to be compared for
// equality.
//
// Optional events include:
// - WaitEvent with ReconcilePending
func FilterOptionalEvents(expected, received []testutil.ExpEvent) ([]testutil.ExpEvent, []testutil.ExpEvent) {
expectedCopy := make([]testutil.ExpEvent, 0, len(expected))
for _, ee := range expected {
if ee.EventType == event.WaitType &&
ee.WaitEvent != nil &&
ee.WaitEvent.Status == event.ReconcilePending {
// Pending WaitEvent is optional.
// Remove first event match, if exists.
for i, re := range received {
if cmp.Equal(re, ee, cmpopts.EquateErrors()) {
// remove event at index i
received = append(received[:i], received[i+1:]...)
break
}
}
} else {
expectedCopy = append(expectedCopy, ee)
}
}
return expectedCopy, received
}

View File

@ -50,7 +50,7 @@ func gitVersion() string {
cmd.Stderr = &errBuf
err = cmd.Run()
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "STDERR: %s", errBuf.String())
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "STDERR:\n%v\nSTDOUT:\n%v", errBuf, outBuf)
return strings.TrimSpace(outBuf.String())
}

View File

@ -323,7 +323,7 @@ func inventoryPolicyAdoptIfNoInventoryTest(ctx context.Context, c client.Client,
}
// handle optional async InProgress StatusEvents
received := testutil.EventsToExpEvents(applierEvents)
receivedEvents := testutil.EventsToExpEvents(applierEvents)
expected := testutil.ExpEvent{
EventType: event.StatusType,
StatusEvent: &testutil.ExpStatusEvent{
@ -332,7 +332,7 @@ func inventoryPolicyAdoptIfNoInventoryTest(ctx context.Context, c client.Client,
Error: nil,
},
}
received, _ = testutil.RemoveEqualEvents(received, expected)
receivedEvents, _ = testutil.RemoveEqualEvents(receivedEvents, expected)
// handle required async Current StatusEvents
expected = testutil.ExpEvent{
@ -343,10 +343,12 @@ func inventoryPolicyAdoptIfNoInventoryTest(ctx context.Context, c client.Client,
Error: nil,
},
}
received, matches := testutil.RemoveEqualEvents(received, expected)
receivedEvents, matches := testutil.RemoveEqualEvents(receivedEvents, expected)
Expect(matches).To(BeNumerically(">=", 1), "unexpected number of %q status events", status.CurrentStatus)
Expect(received).To(testutil.Equal(expEvents))
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("Verify resource was updated and added to inventory")
result := e2eutil.AssertUnstructuredExists(ctx, c, deployment1Obj)
@ -505,7 +507,7 @@ func inventoryPolicyAdoptAllTest(ctx context.Context, c client.Client, invConfig
}
// handle optional async InProgress StatusEvents
received := testutil.EventsToExpEvents(applierEvents)
receivedEvents := testutil.EventsToExpEvents(applierEvents)
expected := testutil.ExpEvent{
EventType: event.StatusType,
StatusEvent: &testutil.ExpStatusEvent{
@ -514,7 +516,7 @@ func inventoryPolicyAdoptAllTest(ctx context.Context, c client.Client, invConfig
Error: nil,
},
}
received, _ = testutil.RemoveEqualEvents(received, expected)
receivedEvents, _ = testutil.RemoveEqualEvents(receivedEvents, expected)
// handle required async Current StatusEvents
expected = testutil.ExpEvent{
@ -525,10 +527,12 @@ func inventoryPolicyAdoptAllTest(ctx context.Context, c client.Client, invConfig
Error: nil,
},
}
received, matches := testutil.RemoveEqualEvents(received, expected)
receivedEvents, matches := testutil.RemoveEqualEvents(receivedEvents, expected)
Expect(matches).To(BeNumerically(">=", 1), "unexpected number of %q status events", status.CurrentStatus)
Expect(received).To(testutil.Equal(expEvents))
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("Verify resource was updated and added to inventory")
result := e2eutil.AssertUnstructuredExists(ctx, c, deployment1Obj)

View File

@ -226,7 +226,11 @@ func mutationTest(ctx context.Context, c client.Client, invConfig invconfig.Inve
},
},
}
Expect(testutil.EventsToExpEvents(applierEvents)).To(testutil.Equal(expEvents))
receivedEvents := testutil.EventsToExpEvents(applierEvents)
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("verify podB is created and ready")
result := e2eutil.AssertUnstructuredExists(ctx, c, podBObj)
@ -414,8 +418,11 @@ func mutationTest(ctx context.Context, c client.Client, invConfig invconfig.Inve
},
},
}
receivedEvents = testutil.EventsToExpEvents(destroyerEvents)
Expect(testutil.EventsToExpEvents(destroyerEvents)).To(testutil.Equal(expEvents))
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("verify podB deleted")
e2eutil.AssertUnstructuredDoesNotExist(ctx, c, podBObj)

View File

@ -113,7 +113,7 @@ func namespaceFilterTest(ctx context.Context, c client.Client, invConfig invconf
},
},
{
// namespace1 reconcile Pending.
// namespace1 reconcile Pending
EventType: event.WaitType,
WaitEvent: &testutil.ExpWaitEvent{
GroupName: "wait-0",
@ -222,7 +222,11 @@ func namespaceFilterTest(ctx context.Context, c client.Client, invConfig invconf
},
},
}
Expect(testutil.EventsToExpEvents(applierEvents)).To(testutil.Equal(expEvents))
receivedEvents := testutil.EventsToExpEvents(applierEvents)
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("verify namespace1 created")
e2eutil.AssertUnstructuredExists(ctx, c, namespace1Obj)

View File

@ -144,7 +144,11 @@ func pruneRetrieveErrorTest(ctx context.Context, c client.Client, invConfig invc
},
},
}
Expect(testutil.EventsToExpEvents(applierEvents)).To(testutil.Equal(expEvents))
receivedEvents := testutil.EventsToExpEvents(applierEvents)
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("Verify pod1 created and ready")
result := e2eutil.AssertUnstructuredExists(ctx, c, pod1Obj)
@ -167,11 +171,11 @@ func pruneRetrieveErrorTest(ctx context.Context, c client.Client, invConfig invc
pod2Obj,
}
applierEvents2 := e2eutil.RunCollect(applier.Run(ctx, inv, resource2, apply.ApplierOptions{
applierEvents = e2eutil.RunCollect(applier.Run(ctx, inv, resource2, apply.ApplierOptions{
EmitStatusEvents: false,
}))
expEvents2 := []testutil.ExpEvent{
expEvents = []testutil.ExpEvent{
{
// InitTask
EventType: event.InitType,
@ -279,7 +283,11 @@ func pruneRetrieveErrorTest(ctx context.Context, c client.Client, invConfig invc
},
},
}
Expect(testutil.EventsToExpEvents(applierEvents2)).To(testutil.Equal(expEvents2))
receivedEvents = testutil.EventsToExpEvents(applierEvents)
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("Verify pod2 created and ready")
result = e2eutil.AssertUnstructuredExists(ctx, c, pod2Obj)
@ -301,7 +309,7 @@ func pruneRetrieveErrorTest(ctx context.Context, c client.Client, invConfig invc
options := apply.DestroyerOptions{InventoryPolicy: inventory.PolicyAdoptIfNoInventory}
destroyerEvents := e2eutil.RunCollect(destroyer.Run(ctx, inv, options))
expEvents3 := []testutil.ExpEvent{
expEvents = []testutil.ExpEvent{
{
// InitTask
EventType: event.InitType,
@ -390,7 +398,11 @@ func pruneRetrieveErrorTest(ctx context.Context, c client.Client, invConfig invc
},
},
}
Expect(testutil.EventsToExpEvents(destroyerEvents)).To(testutil.Equal(expEvents3))
receivedEvents = testutil.EventsToExpEvents(destroyerEvents)
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
By("Verify pod1 is deleted")
e2eutil.AssertUnstructuredDoesNotExist(ctx, c, pod1Obj)

View File

@ -60,9 +60,11 @@ func reconciliationTimeout(ctx context.Context, invConfig invconfig.InventoryCon
}))
expEvents := expectedPodEvents(podObj, event.ReconcileTimeout)
received := testutil.EventsToExpEvents(applierEvents)
receivedEvents := testutil.EventsToExpEvents(applierEvents)
Expect(received).To(testutil.Equal(expEvents))
expEvents, receivedEvents = e2eutil.FilterOptionalEvents(expEvents, receivedEvents)
Expect(receivedEvents).To(testutil.Equal(expEvents))
}
func expectedPodEvents(pod *unstructured.Unstructured, waitStatus event.WaitEventStatus) []testutil.ExpEvent {

View File

@ -65,3 +65,30 @@ metadata:
spec:
cronSpec: "* * * * */5"
`
var deploymentYaml = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: ""
namespace: ""
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.19.6
ports:
- containerPort: 80
resources:
requests:
cpu: 10m
memory: 1Mi
`

View File

@ -0,0 +1,22 @@
# Copyright 2022 The Kubernetes Authors.
# SPDX-License-Identifier: Apache-2.0
# The ThousandDeployments stress test needs to spin up 1,000 pods, so this kind
# cluster config uses 10 nodes, which each default to allowing 110 pods.
#
# The API-server and other control plane components will be
# on the control-plane node to make sure the other nodes have enough capacity.
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
- role: worker
- role: worker
- role: worker
- role: worker
- role: worker
- role: worker
- role: worker

View File

@ -97,6 +97,10 @@ var _ = Describe("Stress", func() {
e2eutil.DeleteNamespace(ctx, c, namespace)
})
It("ThousandDeployments", func() {
thousandDeploymentsTest(ctx, c, invConfig, inventoryName, namespace.GetName())
})
It("ThousandNamespaces", func() {
thousandNamespacesTest(ctx, c, invConfig, inventoryName, namespace.GetName())
})

View File

@ -0,0 +1,130 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package stress
import (
"context"
"fmt"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/apply"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/test/e2e/e2eutil"
"sigs.k8s.io/cli-utils/test/e2e/invconfig"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// thousandDeploymentsTest tests one pre-existing namespace with 1,000
// Deployments in it.
//
// The Deployments themselves are easy to get status on, but with the retrieval
// of generated resource status (ReplicaSets & Pods), this becomes expensive.
func thousandDeploymentsTest(ctx context.Context, c client.Client, invConfig invconfig.InventoryConfig, inventoryName, namespaceName string) {
By("Apply LOTS of resources")
applier := invConfig.ApplierFactoryFunc()
inventoryID := fmt.Sprintf("%s-%s", inventoryName, namespaceName)
inventoryInfo := invconfig.CreateInventoryInfo(invConfig, inventoryName, namespaceName, inventoryID)
resources := []*unstructured.Unstructured{}
labelKey := "created-for"
labelValue := "stress-test"
deploymentObjTemplate := e2eutil.ManifestToUnstructured([]byte(deploymentYaml))
deploymentObjTemplate.SetLabels(map[string]string{labelKey: labelValue})
objectCount := 1000
for i := 1; i <= objectCount; i++ {
deploymentObj := deploymentObjTemplate.DeepCopy()
deploymentObj.SetNamespace(namespaceName)
// change name & selector labels to avoid overlap between deployments
name := fmt.Sprintf("nginx-%d", i)
deploymentObj.SetName(name)
err := unstructured.SetNestedField(deploymentObj.Object, name, "spec", "selector", "matchLabels", "app")
Expect(err).ToNot(HaveOccurred())
err = unstructured.SetNestedField(deploymentObj.Object, name, "spec", "template", "metadata", "labels", "app")
Expect(err).ToNot(HaveOccurred())
resources = append(resources, deploymentObj)
}
defer func() {
By("Cleanup Deployments")
e2eutil.DeleteAllUnstructuredIfExists(ctx, c, deploymentObjTemplate)
}()
start := time.Now()
applierEvents := e2eutil.RunCollect(applier.Run(ctx, inventoryInfo, resources, apply.ApplierOptions{
// SSA reduces GET+PATCH to just PATCH, which is faster
ServerSideOptions: common.ServerSideOptions{
ServerSideApply: true,
ForceConflicts: true,
FieldManager: "cli-utils.kubernetes.io",
},
ReconcileTimeout: 30 * time.Minute,
EmitStatusEvents: false,
}))
duration := time.Since(start)
klog.Infof("Applier.Run execution time: %v", duration)
for _, e := range applierEvents {
Expect(e.ErrorEvent.Err).To(BeNil())
}
for _, e := range applierEvents {
Expect(e.ApplyEvent.Error).To(BeNil(), "ApplyEvent: %v", e.ApplyEvent)
}
for _, e := range applierEvents {
if e.Type == event.WaitType {
Expect(e.WaitEvent.Status).To(BeElementOf(event.ReconcilePending, event.ReconcileSuccessful), "WaitEvent: %v", e.WaitEvent)
}
}
By("Verify inventory created")
invConfig.InvSizeVerifyFunc(ctx, c, inventoryName, namespaceName, inventoryID, len(resources), len(resources))
By(fmt.Sprintf("Verify %d Deployments created", objectCount))
e2eutil.AssertUnstructuredCount(ctx, c, deploymentObjTemplate, objectCount)
By("Destroy LOTS of resources")
destroyer := invConfig.DestroyerFactoryFunc()
start = time.Now()
destroyerEvents := e2eutil.RunCollect(destroyer.Run(ctx, inventoryInfo, apply.DestroyerOptions{
InventoryPolicy: inventory.PolicyAdoptIfNoInventory,
DeleteTimeout: 30 * time.Minute,
}))
duration = time.Since(start)
klog.Infof("Destroyer.Run execution time: %v", duration)
for _, e := range destroyerEvents {
Expect(e.ErrorEvent.Err).To(BeNil())
}
for _, e := range destroyerEvents {
Expect(e.PruneEvent.Error).To(BeNil(), "PruneEvent: %v", e.PruneEvent)
}
for _, e := range destroyerEvents {
if e.Type == event.WaitType {
Expect(e.WaitEvent.Status).To(BeElementOf(event.ReconcilePending, event.ReconcileSuccessful), "WaitEvent: %v", e.WaitEvent)
}
}
By("Verify inventory deleted")
invConfig.InvNotExistsFunc(ctx, c, inventoryName, namespaceName, inventoryID)
By(fmt.Sprintf("Verify %d Deployments deleted", objectCount))
e2eutil.AssertUnstructuredCount(ctx, c, deploymentObjTemplate, 0)
}

View File

@ -21,6 +21,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)
// thousandNamespacesTest tests a CRD and 1,000 new namespaces, each with
// 1 ConfigMap and 1 CronTab in it. This uses implicit dependencies and many
// namespaces with custom resources (json storage) as well and built-in
// resources (proto storage).
//
// With the StatusWatcher, this should only needs FOUR root-scoped informers
// (CRD, Namespace, ConfigMap, CronTab), For comparison, the StatusPoller used
// 2,002 LISTs for each attempt (two root-scoped and two namespace-scoped per
// namespace).
func thousandNamespacesTest(ctx context.Context, c client.Client, invConfig invconfig.InventoryConfig, inventoryName, namespaceName string) {
By("Apply LOTS of resources")
applier := invConfig.ApplierFactoryFunc()
@ -44,7 +53,9 @@ func thousandNamespacesTest(ctx context.Context, c client.Client, invConfig invc
cronTabObjTemplate := e2eutil.ManifestToUnstructured([]byte(cronTabYaml))
cronTabObjTemplate.SetLabels(map[string]string{labelKey: labelValue})
for i := 1; i <= 1000; i++ {
objectCount := 1000
for i := 1; i <= objectCount; i++ {
ns := fmt.Sprintf("%s-%d", namespaceName, i)
namespaceObj := namespaceObjTemplate.DeepCopy()
namespaceObj.SetName(ns)
@ -110,14 +121,14 @@ func thousandNamespacesTest(ctx context.Context, c client.Client, invConfig invc
By("Verify CRD created")
e2eutil.AssertUnstructuredExists(ctx, c, crdObj)
By("Verify 1000 Namespaces created")
e2eutil.AssertUnstructuredCount(ctx, c, namespaceObjTemplate, 1000)
By(fmt.Sprintf("Verify %d Namespaces created", objectCount))
e2eutil.AssertUnstructuredCount(ctx, c, namespaceObjTemplate, objectCount)
By("Verify 1000 ConfigMaps created")
e2eutil.AssertUnstructuredCount(ctx, c, configMapObjTemplate, 1000)
By(fmt.Sprintf("Verify %d ConfigMaps created", objectCount))
e2eutil.AssertUnstructuredCount(ctx, c, configMapObjTemplate, objectCount)
By("Verify 1000 CronTabs created")
e2eutil.AssertUnstructuredCount(ctx, c, cronTabObjTemplate, 1000)
By(fmt.Sprintf("Verify %d CronTabs created", objectCount))
e2eutil.AssertUnstructuredCount(ctx, c, cronTabObjTemplate, objectCount)
By("Destroy LOTS of resources")
destroyer := invConfig.DestroyerFactoryFunc()
@ -147,13 +158,13 @@ func thousandNamespacesTest(ctx context.Context, c client.Client, invConfig invc
By("Verify inventory deleted")
invConfig.InvNotExistsFunc(ctx, c, inventoryName, namespaceName, inventoryID)
By("Verify 1000 CronTabs deleted")
By(fmt.Sprintf("Verify %d CronTabs deleted", objectCount))
e2eutil.AssertUnstructuredCount(ctx, c, cronTabObjTemplate, 0)
By("Verify 1000 ConfigMaps deleted")
By(fmt.Sprintf("Verify %d ConfigMaps deleted", objectCount))
e2eutil.AssertUnstructuredCount(ctx, c, configMapObjTemplate, 0)
By("Verify 1000 Namespaces deleted")
By(fmt.Sprintf("Verify %d Namespaces deleted", objectCount))
e2eutil.AssertUnstructuredCount(ctx, c, namespaceObjTemplate, 0)
By("Verify CRD deleted")