From 185aa3d04863f5147e20dd01f96d3021bf50fe8d Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Tue, 2 Feb 2021 19:59:22 -0800 Subject: [PATCH] Better final inventory calculation during errors --- pkg/apply/applier.go | 15 ++ pkg/apply/prune/prune.go | 26 +--- pkg/apply/prune/prune_test.go | 5 + pkg/apply/solver/solver.go | 9 ++ pkg/apply/solver/solver_test.go | 13 +- pkg/apply/task/apply_task.go | 80 +++++++---- pkg/apply/task/apply_task_test.go | 225 ++++++++++++++++++++++++++++++ pkg/apply/taskrunner/context.go | 17 ++- 8 files changed, 333 insertions(+), 57 deletions(-) diff --git a/pkg/apply/applier.go b/pkg/apply/applier.go index ba0c124..69a2829 100644 --- a/pkg/apply/applier.go +++ b/pkg/apply/applier.go @@ -104,6 +104,12 @@ func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs []* return nil, err } } + // Retrieve previous inventory objects. Must happen before inventory client merge. + prevInv, err := a.invClient.GetClusterObjs(localInv) + if err != nil { + return nil, err + } + klog.V(4).Infof("%d previous inventory objects in cluster", len(prevInv)) klog.V(4).Infof("applier merging %d objects into inventory", len(localObjs)) currentObjs := object.UnstructuredsToObjMetas(localObjs) @@ -122,6 +128,7 @@ func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs []* LocalInv: localInv, Resources: localObjs, PruneIds: pruneIds, + PrevInv: prevInv, }, nil } @@ -132,6 +139,7 @@ type ResourceObjects struct { LocalInv inventory.InventoryInfo Resources []*unstructured.Unstructured PruneIds []object.ObjMetadata + PrevInv []object.ObjMetadata } // ObjsForApply returns the unstructured representation for all the resources @@ -162,6 +170,13 @@ func (r *ResourceObjects) IdsForPrune() []object.ObjMetadata { return r.PruneIds } +// IdsForPrevInv returns the Ids for the previous inventory. These +// Ids reference the objects managed by the inventory object which +// are already in the cluster. +func (r *ResourceObjects) IdsForPrevInv() []object.ObjMetadata { + return r.PrevInv +} + // AllIds returns the Ids for all resources that are relevant. This // includes resources that will be applied or pruned. func (r *ResourceObjects) AllIds() []object.ObjMetadata { diff --git a/pkg/apply/prune/prune.go b/pkg/apply/prune/prune.go index 15c5850..1d7832b 100644 --- a/pkg/apply/prune/prune.go +++ b/pkg/apply/prune/prune.go @@ -194,29 +194,9 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo, } taskContext.EventChannel() <- createPruneEvent(pruneObj, obj, event.Pruned) } - // Calculate final inventory items, ensuring only successfully applied - // objects are in the inventory along with prune failures. - finalInventory := []object.ObjMetadata{} - for _, localObj := range localIds { - obj, err := po.getObject(localObj) - if err != nil { - if klog.V(4) { - klog.Errorf("error retrieving object for inventory determination: %s", err) - } - continue - } - uid := string(obj.GetUID()) - if currentUIDs.Has(uid) { - klog.V(5).Infof("adding final inventory object %s/%s", localObj.Namespace, localObj.Name) - finalInventory = append(finalInventory, localObj) - } else { - klog.V(5).Infof("uid not found (%s); not adding final inventory obj %s/%s", - uid, localObj.Namespace, localObj.Name) - } - } - klog.V(4).Infof("final inventory %d successfully applied objects", len(finalInventory)) - finalInventory = append(finalInventory, pruneFailures...) - klog.V(4).Infof("final inventory %d objects after appending prune failures", len(finalInventory)) + // Final inventory equals applied objects and prune failures. + appliedResources := taskContext.AppliedResources() + finalInventory := append(appliedResources, pruneFailures...) return po.InvClient.Replace(localInv, finalInventory) } diff --git a/pkg/apply/prune/prune_test.go b/pkg/apply/prune/prune_test.go index 103e418..a038950 100644 --- a/pkg/apply/prune/prune_test.go +++ b/pkg/apply/prune/prune_test.go @@ -258,6 +258,11 @@ func TestPrune(t *testing.T) { // the events that can be put on it. eventChannel := make(chan event.Event, len(tc.pastObjs)+1) // Add one for inventory object taskContext := taskrunner.NewTaskContext(eventChannel) + for _, u := range tc.currentObjs { + o := object.UnstructuredToObjMeta(u) + uid := u.GetUID() + taskContext.ResourceApplied(o, uid, 0) + } err := func() error { defer close(eventChannel) // Run the prune and validate. diff --git a/pkg/apply/solver/solver.go b/pkg/apply/solver/solver.go index 08d5965..996f457 100644 --- a/pkg/apply/solver/solver.go +++ b/pkg/apply/solver/solver.go @@ -56,6 +56,7 @@ type resourceObjects interface { Inventory() inventory.InventoryInfo IdsForApply() []object.ObjMetadata IdsForPrune() []object.ObjMetadata + IdsForPrevInv() []object.ObjMetadata } // BuildTaskQueue takes a set of resources in the form of info objects @@ -65,12 +66,19 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects, o Options) chan taskrunner.Task { var tasks []taskrunner.Task remainingInfos := ro.ObjsForApply() + // Convert slice of previous inventory objects into a map. + prevInvSlice := ro.IdsForPrevInv() + prevInventory := make(map[object.ObjMetadata]bool, len(prevInvSlice)) + for _, prevInvObj := range prevInvSlice { + prevInventory[prevInvObj] = true + } crdSplitRes, hasCRDs := splitAfterCRDs(remainingInfos) if hasCRDs { tasks = append(tasks, &task.ApplyTask{ Objects: append(crdSplitRes.before, crdSplitRes.crds...), CRDs: crdSplitRes.crds, + PrevInventory: prevInventory, ServerSideOptions: o.ServerSideOptions, DryRunStrategy: o.DryRunStrategy, InfoHelper: t.InfoHelper, @@ -96,6 +104,7 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects, &task.ApplyTask{ Objects: remainingInfos, CRDs: crdSplitRes.crds, + PrevInventory: prevInventory, ServerSideOptions: o.ServerSideOptions, DryRunStrategy: o.DryRunStrategy, InfoHelper: t.InfoHelper, diff --git a/pkg/apply/solver/solver_test.go b/pkg/apply/solver/solver_test.go index 899de6f..82b7ea1 100644 --- a/pkg/apply/solver/solver_test.go +++ b/pkg/apply/solver/solver_test.go @@ -309,10 +309,11 @@ func getType(task taskrunner.Task) reflect.Type { } type fakeResourceObjects struct { - objsForApply []*unstructured.Unstructured - inventory inventory.InventoryInfo - idsForApply []object.ObjMetadata - idsForPrune []object.ObjMetadata + objsForApply []*unstructured.Unstructured + inventory inventory.InventoryInfo + idsForApply []object.ObjMetadata + idsForPrune []object.ObjMetadata + idsForPrevInv []object.ObjMetadata } func (f *fakeResourceObjects) ObjsForApply() []*unstructured.Unstructured { @@ -331,6 +332,10 @@ func (f *fakeResourceObjects) IdsForPrune() []object.ObjMetadata { return f.idsForPrune } +func (f *fakeResourceObjects) IdsForPrevInv() []object.ObjMetadata { + return f.idsForPrevInv +} + func ignoreErrInfoToObjMeta(info *unstructured.Unstructured) object.ObjMetadata { objMeta := object.UnstructuredToObjMeta(info) return objMeta diff --git a/pkg/apply/task/apply_task.go b/pkg/apply/task/apply_task.go index 10ebb1e..142df2c 100644 --- a/pkg/apply/task/apply_task.go +++ b/pkg/apply/task/apply_task.go @@ -17,7 +17,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/klog" "k8s.io/kubectl/pkg/cmd/apply" - "k8s.io/kubectl/pkg/cmd/delete" + cmddelete "k8s.io/kubectl/pkg/cmd/delete" "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/util/slice" applyerror "sigs.k8s.io/cli-utils/pkg/apply/error" @@ -45,11 +45,13 @@ type applyOptions interface { // ApplyTask applies the given Objects to the cluster // by using the ApplyOptions. type ApplyTask struct { - Factory util.Factory - InfoHelper info.InfoHelper - Mapper meta.RESTMapper - Objects []*unstructured.Unstructured - CRDs []*unstructured.Unstructured + Factory util.Factory + InfoHelper info.InfoHelper + Mapper meta.RESTMapper + Objects []*unstructured.Unstructured + CRDs []*unstructured.Unstructured + // Used for determining inventory during errors + PrevInventory map[object.ObjMetadata]bool DryRunStrategy common.DryRunStrategy ServerSideOptions common.ServerSideOptions InventoryPolicy inventory.InventoryPolicy @@ -80,6 +82,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) { // we have a CRD and a CR in the same resource set, but the CRD // will not actually have been applied when we reach the CR. if a.DryRunStrategy.ClientOrServerDryRun() { + klog.V(4).Infof("dry-run filtering custom resources...") // Find all resources in the set that doesn't exist in the // RESTMapper, but where we do have the CRD for the type in // the resource set. @@ -97,6 +100,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) { taskContext.EventChannel() <- createApplyEvent(object.UnstructuredToObjMeta(obj), event.Created, nil) } // Update the resource set to no longer include the CRs. + klog.V(4).Infof("after dry-run filtering custom resources, %d objects left", len(objs)) objects = objs } @@ -123,7 +127,8 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) { } klog.V(4).Infof("attempting to apply %d remaining objects", len(objects)) - var infos []*resource.Info + // invInfos stores the objects which should be stored in the final inventory. + invInfos := make(map[object.ObjMetadata]*resource.Info, len(objects)) for _, obj := range objects { // Set the client and mapping fields on the provided // info so they can be applied to the cluster. @@ -144,18 +149,25 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) { if err != nil { if !apierrors.IsNotFound(err) { if klog.V(4) { - klog.Errorf("error retrieving %s/%s from cluster--continue", - info.Namespace, info.Name) + klog.Errorf("error (%s) retrieving %s/%s from cluster--continue", + err, info.Namespace, info.Name) } - taskContext.EventChannel() <- createApplyEvent( - id, - event.Unchanged, - err) + op := event.Failed + if a.objInCluster(id) { + // Object in cluster stays in the inventory. + klog.V(4).Infof("%s/%s apply retrieval failure, but in cluster--keep in inventory", + info.Namespace, info.Name) + invInfos[id] = info + op = event.Unchanged + } + taskContext.EventChannel() <- createApplyEvent(id, op, err) taskContext.CaptureResourceFailure(id) continue } } - infos = append(infos, info) + // At this point the object was either 1) successfully retrieved from the cluster, or + // 2) returned "Not Found" error (meaning first-time creation). Add to final inventory. + invInfos[id] = info canApply, err := inventory.CanApply(a.InvInfo, clusterObj, a.InventoryPolicy) if !canApply { klog.V(5).Infof("can not apply %s/%s--continue", @@ -176,30 +188,30 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) { if klog.V(4) { klog.Errorf("error applying (%s/%s) %s", info.Namespace, info.Name, err) } + // If apply failed and the object is not in the cluster, remove + // it from the final inventory. + if !a.objInCluster(id) { + klog.V(5).Infof("not in cluster; removing apply fail object %s/%s from inventory", + info.Namespace, info.Name) + delete(invInfos, id) + } taskContext.EventChannel() <- createApplyEvent( id, event.Failed, applyerror.NewApplyRunError(err)) taskContext.CaptureResourceFailure(id) } } - // Fetch the Generation from all Infos after they have been - // applied. - for _, inf := range infos { - id, err := object.InfoToObjMeta(inf) - if err != nil { - continue - } - if inf.Object != nil { - acc, err := meta.Accessor(inf.Object) + // Store objects (and some obj metadata) in the task context + // for the final inventory. + for id, info := range invInfos { + if info.Object != nil { + acc, err := meta.Accessor(info.Object) if err != nil { continue } - // Only add a resource if it successfully applied. uid := acc.GetUID() - if string(uid) != "" { - gen := acc.GetGeneration() - taskContext.ResourceApplied(id, uid, gen) - } + gen := acc.GetGeneration() + taskContext.ResourceApplied(id, uid, gen) } } a.sendTaskResult(taskContext) @@ -232,7 +244,7 @@ func newApplyOptions(eventChannel chan event.Event, serverSideOptions common.Ser }, // FilenameOptions are not needed since we don't use the ApplyOptions // to read manifests. - DeleteOptions: &delete.DeleteOptions{}, + DeleteOptions: &cmddelete.DeleteOptions{}, PrintFlags: &genericclioptions.PrintFlags{ OutputFormat: &emptyString, }, @@ -292,6 +304,16 @@ func (a *ApplyTask) filterCRsWithCRDInSet(objects []*unstructured.Unstructured) return objs, objsWithCRD, nil } +// objInCluster returns true if the passed object is in the slice of +// previous inventory, because an object in the previous inventory +// exists in the cluster. +func (a *ApplyTask) objInCluster(obj object.ObjMetadata) bool { + if _, found := a.PrevInventory[obj]; found { + return true + } + return false +} + type crdsInfo struct { crds []crdInfo } diff --git a/pkg/apply/task/apply_task_test.go b/pkg/apply/task/apply_task_test.go index 251b45e..1640da7 100644 --- a/pkg/apply/task/apply_task_test.go +++ b/pkg/apply/task/apply_task_test.go @@ -10,6 +10,7 @@ import ( "testing" "gotest.tools/assert" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -34,6 +35,230 @@ type resourceInfo struct { generation int64 } +// Tests that the correct "applied" objects are sent +// to the TaskContext correctly, since these are the +// applied objects added to the final inventory. +func TestApplyTask_BasicAppliedObjects(t *testing.T) { + testCases := map[string]struct { + applied []resourceInfo + }{ + "apply single namespaced resource": { + applied: []resourceInfo{ + { + group: "apps", + apiVersion: "apps/v1", + kind: "Deployment", + name: "foo", + namespace: "default", + uid: types.UID("my-uid"), + generation: int64(42), + }, + }, + }, + "apply multiple clusterscoped resources": { + applied: []resourceInfo{ + { + group: "custom.io", + apiVersion: "custom.io/v1beta1", + kind: "Custom", + name: "bar", + uid: types.UID("uid-1"), + generation: int64(32), + }, + { + group: "custom2.io", + apiVersion: "custom2.io/v1", + kind: "Custom2", + name: "foo", + uid: types.UID("uid-2"), + generation: int64(1), + }, + }, + }, + } + + for tn, tc := range testCases { + t.Run(tn, func(t *testing.T) { + eventChannel := make(chan event.Event) + defer close(eventChannel) + taskContext := taskrunner.NewTaskContext(eventChannel) + + objs := toUnstructureds(tc.applied) + + oldAO := applyOptionsFactoryFunc + applyOptionsFactoryFunc = func(chan event.Event, common.ServerSideOptions, common.DryRunStrategy, util.Factory) (applyOptions, dynamic.Interface, error) { + return &fakeApplyOptions{}, nil, nil + } + defer func() { applyOptionsFactoryFunc = oldAO }() + + restMapper := testutil.NewFakeRESTMapper(schema.GroupVersionKind{ + Group: "apps", + Version: "v1", + Kind: "Deployment", + }, schema.GroupVersionKind{ + Group: "anothercustom.io", + Version: "v2", + Kind: "AnotherCustom", + }) + + applyTask := &ApplyTask{ + Objects: objs, + Mapper: restMapper, + InfoHelper: &fakeInfoHelper{}, + InvInfo: &fakeInventoryInfo{}, + } + + getClusterObj = func(d dynamic.Interface, info *resource.Info) (*unstructured.Unstructured, error) { + return objs[0], nil + } + applyTask.Start(taskContext) + <-taskContext.TaskChannel() + + // The applied resources should be stored in the TaskContext + // for the final inventory. + expected := object.UnstructuredsToObjMetas(objs) + actual := taskContext.AppliedResources() + if !object.SetEquals(expected, actual) { + t.Errorf("expected (%s) inventory resources, got (%s)", expected, actual) + } + }) + } +} + +// Checks the inventory stored in the task context applied +// resources is correct, given a retrieval error and +// a specific previous inventory. Also, an apply failure +// for an object in the previous inventory should remain +// in the inventory, while an apply failure that is not +// in the previous inventory (creation) should not be +// in the final inventory. +func TestApplyTask_ApplyFailuresAndInventory(t *testing.T) { + resInfo := resourceInfo{ + group: "apps", + apiVersion: "apps/v1", + kind: "Deployment", + name: "foo", + namespace: "default", + uid: types.UID("my-uid"), + generation: int64(42), + } + resID, _ := object.CreateObjMetadata("default", "foo", + schema.GroupKind{Group: "apps", Kind: "Deployment"}) + applyFailInfo := resourceInfo{ + group: "apps", + apiVersion: "apps/v1", + kind: "Deployment", + name: "failure", + namespace: "default", + uid: types.UID("my-uid"), + generation: int64(42), + } + applyFailID, _ := object.CreateObjMetadata("default", "failure", + schema.GroupKind{Group: "apps", Kind: "Deployment"}) + + testCases := map[string]struct { + applied []resourceInfo + prevInventory []object.ObjMetadata + expected []object.ObjMetadata + err error + }{ + "not found error with successful apply is in final inventory": { + applied: []resourceInfo{resInfo}, + prevInventory: []object.ObjMetadata{}, + expected: []object.ObjMetadata{resID}, + err: apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "pod"}, "fake"), + }, + "unknown error, but in previous inventory: object is in final inventory": { + applied: []resourceInfo{resInfo}, + prevInventory: []object.ObjMetadata{resID}, + expected: []object.ObjMetadata{resID}, + err: apierrors.NewUnauthorized("not authorized"), + }, + "unknown error, not in previous inventory: object is NOT in final inventory": { + applied: []resourceInfo{resInfo}, + prevInventory: []object.ObjMetadata{}, + expected: []object.ObjMetadata{}, + err: apierrors.NewUnauthorized("not authorized"), + }, + "apply failure, in previous inventory: object is in final inventory": { + applied: []resourceInfo{applyFailInfo}, + prevInventory: []object.ObjMetadata{applyFailID}, + expected: []object.ObjMetadata{applyFailID}, + err: nil, + }, + "apply failure, not in previous inventory: object is NOT in final inventory": { + applied: []resourceInfo{applyFailInfo}, + prevInventory: []object.ObjMetadata{}, + expected: []object.ObjMetadata{}, + err: nil, + }, + } + + for tn, tc := range testCases { + t.Run(tn, func(t *testing.T) { + eventChannel := make(chan event.Event) + taskContext := taskrunner.NewTaskContext(eventChannel) + + objs := toUnstructureds(tc.applied) + + oldAO := applyOptionsFactoryFunc + applyOptionsFactoryFunc = func(chan event.Event, common.ServerSideOptions, common.DryRunStrategy, util.Factory) (applyOptions, dynamic.Interface, error) { + return &fakeApplyOptions{}, nil, nil + } + defer func() { applyOptionsFactoryFunc = oldAO }() + + restMapper := testutil.NewFakeRESTMapper(schema.GroupVersionKind{ + Group: "apps", + Version: "v1", + Kind: "Deployment", + }) + + prevInv := map[object.ObjMetadata]bool{} + for _, id := range tc.prevInventory { + prevInv[id] = true + } + applyTask := &ApplyTask{ + Objects: objs, + PrevInventory: prevInv, + Mapper: restMapper, + InfoHelper: &fakeInfoHelper{}, + InvInfo: &fakeInventoryInfo{}, + } + + getClusterObj = func(d dynamic.Interface, info *resource.Info) (*unstructured.Unstructured, error) { + return objs[0], nil + } + if tc.err != nil { + getClusterObj = func(d dynamic.Interface, info *resource.Info) (*unstructured.Unstructured, error) { + return nil, tc.err + } + } + + var events []event.Event + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for msg := range eventChannel { + events = append(events, msg) + } + }() + + applyTask.Start(taskContext) + <-taskContext.TaskChannel() + close(eventChannel) + wg.Wait() + + // The applied resources should be stored in the TaskContext + // for the final inventory. + actual := taskContext.AppliedResources() + if !object.SetEquals(tc.expected, actual) { + t.Errorf("expected (%s) inventory resources, got (%s)", tc.expected, actual) + } + }) + } +} + func TestApplyTask_FetchGeneration(t *testing.T) { testCases := map[string]struct { rss []resourceInfo diff --git a/pkg/apply/taskrunner/context.go b/pkg/apply/taskrunner/context.go index c18e843..fc582df 100644 --- a/pkg/apply/taskrunner/context.go +++ b/pkg/apply/taskrunner/context.go @@ -4,6 +4,8 @@ package taskrunner import ( + "strings" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/cli-utils/pkg/apply/event" @@ -60,12 +62,25 @@ func (tc *TaskContext) ResourceUID(id object.ObjMetadata) (types.UID, bool) { return ai.uid, true } +// AppliedResources returns all the objects (as ObjMetadata) that +// were added as applied resources to the TaskContext. +func (tc *TaskContext) AppliedResources() []object.ObjMetadata { + all := make([]object.ObjMetadata, 0, len(tc.appliedResources)) + for r := range tc.appliedResources { + all = append(all, r) + } + return all +} + // AllResourceUIDs returns a set with the UIDs of all the resources in the // context. func (tc *TaskContext) AllResourceUIDs() sets.String { uids := sets.NewString() for _, ai := range tc.appliedResources { - uids.Insert(string(ai.uid)) + uid := strings.TrimSpace(string(ai.uid)) + if uid != "" { + uids.Insert(uid) + } } return uids }