Merge pull request #322 from seans3/final-inventory-update

Better final inventory calculation during errors
This commit is contained in:
Kubernetes Prow Robot 2021-02-05 10:54:52 -08:00 committed by GitHub
commit 7992542715
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 333 additions and 57 deletions

View File

@ -104,6 +104,12 @@ func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs []*
return nil, err
}
}
// Retrieve previous inventory objects. Must happen before inventory client merge.
prevInv, err := a.invClient.GetClusterObjs(localInv)
if err != nil {
return nil, err
}
klog.V(4).Infof("%d previous inventory objects in cluster", len(prevInv))
klog.V(4).Infof("applier merging %d objects into inventory", len(localObjs))
currentObjs := object.UnstructuredsToObjMetas(localObjs)
@ -122,6 +128,7 @@ func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs []*
LocalInv: localInv,
Resources: localObjs,
PruneIds: pruneIds,
PrevInv: prevInv,
}, nil
}
@ -132,6 +139,7 @@ type ResourceObjects struct {
LocalInv inventory.InventoryInfo
Resources []*unstructured.Unstructured
PruneIds []object.ObjMetadata
PrevInv []object.ObjMetadata
}
// ObjsForApply returns the unstructured representation for all the resources
@ -162,6 +170,13 @@ func (r *ResourceObjects) IdsForPrune() []object.ObjMetadata {
return r.PruneIds
}
// IdsForPrevInv returns the Ids for the previous inventory. These
// Ids reference the objects managed by the inventory object which
// are already in the cluster.
func (r *ResourceObjects) IdsForPrevInv() []object.ObjMetadata {
return r.PrevInv
}
// AllIds returns the Ids for all resources that are relevant. This
// includes resources that will be applied or pruned.
func (r *ResourceObjects) AllIds() []object.ObjMetadata {

View File

@ -194,29 +194,9 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo,
}
taskContext.EventChannel() <- createPruneEvent(pruneObj, obj, event.Pruned)
}
// Calculate final inventory items, ensuring only successfully applied
// objects are in the inventory along with prune failures.
finalInventory := []object.ObjMetadata{}
for _, localObj := range localIds {
obj, err := po.getObject(localObj)
if err != nil {
if klog.V(4) {
klog.Errorf("error retrieving object for inventory determination: %s", err)
}
continue
}
uid := string(obj.GetUID())
if currentUIDs.Has(uid) {
klog.V(5).Infof("adding final inventory object %s/%s", localObj.Namespace, localObj.Name)
finalInventory = append(finalInventory, localObj)
} else {
klog.V(5).Infof("uid not found (%s); not adding final inventory obj %s/%s",
uid, localObj.Namespace, localObj.Name)
}
}
klog.V(4).Infof("final inventory %d successfully applied objects", len(finalInventory))
finalInventory = append(finalInventory, pruneFailures...)
klog.V(4).Infof("final inventory %d objects after appending prune failures", len(finalInventory))
// Final inventory equals applied objects and prune failures.
appliedResources := taskContext.AppliedResources()
finalInventory := append(appliedResources, pruneFailures...)
return po.InvClient.Replace(localInv, finalInventory)
}

View File

@ -258,6 +258,11 @@ func TestPrune(t *testing.T) {
// the events that can be put on it.
eventChannel := make(chan event.Event, len(tc.pastObjs)+1) // Add one for inventory object
taskContext := taskrunner.NewTaskContext(eventChannel)
for _, u := range tc.currentObjs {
o := object.UnstructuredToObjMeta(u)
uid := u.GetUID()
taskContext.ResourceApplied(o, uid, 0)
}
err := func() error {
defer close(eventChannel)
// Run the prune and validate.

View File

@ -56,6 +56,7 @@ type resourceObjects interface {
Inventory() inventory.InventoryInfo
IdsForApply() []object.ObjMetadata
IdsForPrune() []object.ObjMetadata
IdsForPrevInv() []object.ObjMetadata
}
// BuildTaskQueue takes a set of resources in the form of info objects
@ -65,12 +66,19 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects,
o Options) chan taskrunner.Task {
var tasks []taskrunner.Task
remainingInfos := ro.ObjsForApply()
// Convert slice of previous inventory objects into a map.
prevInvSlice := ro.IdsForPrevInv()
prevInventory := make(map[object.ObjMetadata]bool, len(prevInvSlice))
for _, prevInvObj := range prevInvSlice {
prevInventory[prevInvObj] = true
}
crdSplitRes, hasCRDs := splitAfterCRDs(remainingInfos)
if hasCRDs {
tasks = append(tasks, &task.ApplyTask{
Objects: append(crdSplitRes.before, crdSplitRes.crds...),
CRDs: crdSplitRes.crds,
PrevInventory: prevInventory,
ServerSideOptions: o.ServerSideOptions,
DryRunStrategy: o.DryRunStrategy,
InfoHelper: t.InfoHelper,
@ -96,6 +104,7 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects,
&task.ApplyTask{
Objects: remainingInfos,
CRDs: crdSplitRes.crds,
PrevInventory: prevInventory,
ServerSideOptions: o.ServerSideOptions,
DryRunStrategy: o.DryRunStrategy,
InfoHelper: t.InfoHelper,

View File

@ -309,10 +309,11 @@ func getType(task taskrunner.Task) reflect.Type {
}
type fakeResourceObjects struct {
objsForApply []*unstructured.Unstructured
inventory inventory.InventoryInfo
idsForApply []object.ObjMetadata
idsForPrune []object.ObjMetadata
objsForApply []*unstructured.Unstructured
inventory inventory.InventoryInfo
idsForApply []object.ObjMetadata
idsForPrune []object.ObjMetadata
idsForPrevInv []object.ObjMetadata
}
func (f *fakeResourceObjects) ObjsForApply() []*unstructured.Unstructured {
@ -331,6 +332,10 @@ func (f *fakeResourceObjects) IdsForPrune() []object.ObjMetadata {
return f.idsForPrune
}
func (f *fakeResourceObjects) IdsForPrevInv() []object.ObjMetadata {
return f.idsForPrevInv
}
func ignoreErrInfoToObjMeta(info *unstructured.Unstructured) object.ObjMetadata {
objMeta := object.UnstructuredToObjMeta(info)
return objMeta

View File

@ -17,7 +17,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/klog"
"k8s.io/kubectl/pkg/cmd/apply"
"k8s.io/kubectl/pkg/cmd/delete"
cmddelete "k8s.io/kubectl/pkg/cmd/delete"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/util/slice"
applyerror "sigs.k8s.io/cli-utils/pkg/apply/error"
@ -45,11 +45,13 @@ type applyOptions interface {
// ApplyTask applies the given Objects to the cluster
// by using the ApplyOptions.
type ApplyTask struct {
Factory util.Factory
InfoHelper info.InfoHelper
Mapper meta.RESTMapper
Objects []*unstructured.Unstructured
CRDs []*unstructured.Unstructured
Factory util.Factory
InfoHelper info.InfoHelper
Mapper meta.RESTMapper
Objects []*unstructured.Unstructured
CRDs []*unstructured.Unstructured
// Used for determining inventory during errors
PrevInventory map[object.ObjMetadata]bool
DryRunStrategy common.DryRunStrategy
ServerSideOptions common.ServerSideOptions
InventoryPolicy inventory.InventoryPolicy
@ -80,6 +82,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
// we have a CRD and a CR in the same resource set, but the CRD
// will not actually have been applied when we reach the CR.
if a.DryRunStrategy.ClientOrServerDryRun() {
klog.V(4).Infof("dry-run filtering custom resources...")
// Find all resources in the set that doesn't exist in the
// RESTMapper, but where we do have the CRD for the type in
// the resource set.
@ -97,6 +100,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
taskContext.EventChannel() <- createApplyEvent(object.UnstructuredToObjMeta(obj), event.Created, nil)
}
// Update the resource set to no longer include the CRs.
klog.V(4).Infof("after dry-run filtering custom resources, %d objects left", len(objs))
objects = objs
}
@ -123,7 +127,8 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
}
klog.V(4).Infof("attempting to apply %d remaining objects", len(objects))
var infos []*resource.Info
// invInfos stores the objects which should be stored in the final inventory.
invInfos := make(map[object.ObjMetadata]*resource.Info, len(objects))
for _, obj := range objects {
// Set the client and mapping fields on the provided
// info so they can be applied to the cluster.
@ -144,18 +149,25 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
if err != nil {
if !apierrors.IsNotFound(err) {
if klog.V(4) {
klog.Errorf("error retrieving %s/%s from cluster--continue",
info.Namespace, info.Name)
klog.Errorf("error (%s) retrieving %s/%s from cluster--continue",
err, info.Namespace, info.Name)
}
taskContext.EventChannel() <- createApplyEvent(
id,
event.Unchanged,
err)
op := event.Failed
if a.objInCluster(id) {
// Object in cluster stays in the inventory.
klog.V(4).Infof("%s/%s apply retrieval failure, but in cluster--keep in inventory",
info.Namespace, info.Name)
invInfos[id] = info
op = event.Unchanged
}
taskContext.EventChannel() <- createApplyEvent(id, op, err)
taskContext.CaptureResourceFailure(id)
continue
}
}
infos = append(infos, info)
// At this point the object was either 1) successfully retrieved from the cluster, or
// 2) returned "Not Found" error (meaning first-time creation). Add to final inventory.
invInfos[id] = info
canApply, err := inventory.CanApply(a.InvInfo, clusterObj, a.InventoryPolicy)
if !canApply {
klog.V(5).Infof("can not apply %s/%s--continue",
@ -176,30 +188,30 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
if klog.V(4) {
klog.Errorf("error applying (%s/%s) %s", info.Namespace, info.Name, err)
}
// If apply failed and the object is not in the cluster, remove
// it from the final inventory.
if !a.objInCluster(id) {
klog.V(5).Infof("not in cluster; removing apply fail object %s/%s from inventory",
info.Namespace, info.Name)
delete(invInfos, id)
}
taskContext.EventChannel() <- createApplyEvent(
id, event.Failed, applyerror.NewApplyRunError(err))
taskContext.CaptureResourceFailure(id)
}
}
// Fetch the Generation from all Infos after they have been
// applied.
for _, inf := range infos {
id, err := object.InfoToObjMeta(inf)
if err != nil {
continue
}
if inf.Object != nil {
acc, err := meta.Accessor(inf.Object)
// Store objects (and some obj metadata) in the task context
// for the final inventory.
for id, info := range invInfos {
if info.Object != nil {
acc, err := meta.Accessor(info.Object)
if err != nil {
continue
}
// Only add a resource if it successfully applied.
uid := acc.GetUID()
if string(uid) != "" {
gen := acc.GetGeneration()
taskContext.ResourceApplied(id, uid, gen)
}
gen := acc.GetGeneration()
taskContext.ResourceApplied(id, uid, gen)
}
}
a.sendTaskResult(taskContext)
@ -232,7 +244,7 @@ func newApplyOptions(eventChannel chan event.Event, serverSideOptions common.Ser
},
// FilenameOptions are not needed since we don't use the ApplyOptions
// to read manifests.
DeleteOptions: &delete.DeleteOptions{},
DeleteOptions: &cmddelete.DeleteOptions{},
PrintFlags: &genericclioptions.PrintFlags{
OutputFormat: &emptyString,
},
@ -292,6 +304,16 @@ func (a *ApplyTask) filterCRsWithCRDInSet(objects []*unstructured.Unstructured)
return objs, objsWithCRD, nil
}
// objInCluster returns true if the passed object is in the slice of
// previous inventory, because an object in the previous inventory
// exists in the cluster.
func (a *ApplyTask) objInCluster(obj object.ObjMetadata) bool {
if _, found := a.PrevInventory[obj]; found {
return true
}
return false
}
type crdsInfo struct {
crds []crdInfo
}

View File

@ -10,6 +10,7 @@ import (
"testing"
"gotest.tools/assert"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
@ -34,6 +35,230 @@ type resourceInfo struct {
generation int64
}
// Tests that the correct "applied" objects are sent
// to the TaskContext correctly, since these are the
// applied objects added to the final inventory.
func TestApplyTask_BasicAppliedObjects(t *testing.T) {
testCases := map[string]struct {
applied []resourceInfo
}{
"apply single namespaced resource": {
applied: []resourceInfo{
{
group: "apps",
apiVersion: "apps/v1",
kind: "Deployment",
name: "foo",
namespace: "default",
uid: types.UID("my-uid"),
generation: int64(42),
},
},
},
"apply multiple clusterscoped resources": {
applied: []resourceInfo{
{
group: "custom.io",
apiVersion: "custom.io/v1beta1",
kind: "Custom",
name: "bar",
uid: types.UID("uid-1"),
generation: int64(32),
},
{
group: "custom2.io",
apiVersion: "custom2.io/v1",
kind: "Custom2",
name: "foo",
uid: types.UID("uid-2"),
generation: int64(1),
},
},
},
}
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
eventChannel := make(chan event.Event)
defer close(eventChannel)
taskContext := taskrunner.NewTaskContext(eventChannel)
objs := toUnstructureds(tc.applied)
oldAO := applyOptionsFactoryFunc
applyOptionsFactoryFunc = func(chan event.Event, common.ServerSideOptions, common.DryRunStrategy, util.Factory) (applyOptions, dynamic.Interface, error) {
return &fakeApplyOptions{}, nil, nil
}
defer func() { applyOptionsFactoryFunc = oldAO }()
restMapper := testutil.NewFakeRESTMapper(schema.GroupVersionKind{
Group: "apps",
Version: "v1",
Kind: "Deployment",
}, schema.GroupVersionKind{
Group: "anothercustom.io",
Version: "v2",
Kind: "AnotherCustom",
})
applyTask := &ApplyTask{
Objects: objs,
Mapper: restMapper,
InfoHelper: &fakeInfoHelper{},
InvInfo: &fakeInventoryInfo{},
}
getClusterObj = func(d dynamic.Interface, info *resource.Info) (*unstructured.Unstructured, error) {
return objs[0], nil
}
applyTask.Start(taskContext)
<-taskContext.TaskChannel()
// The applied resources should be stored in the TaskContext
// for the final inventory.
expected := object.UnstructuredsToObjMetas(objs)
actual := taskContext.AppliedResources()
if !object.SetEquals(expected, actual) {
t.Errorf("expected (%s) inventory resources, got (%s)", expected, actual)
}
})
}
}
// Checks the inventory stored in the task context applied
// resources is correct, given a retrieval error and
// a specific previous inventory. Also, an apply failure
// for an object in the previous inventory should remain
// in the inventory, while an apply failure that is not
// in the previous inventory (creation) should not be
// in the final inventory.
func TestApplyTask_ApplyFailuresAndInventory(t *testing.T) {
resInfo := resourceInfo{
group: "apps",
apiVersion: "apps/v1",
kind: "Deployment",
name: "foo",
namespace: "default",
uid: types.UID("my-uid"),
generation: int64(42),
}
resID, _ := object.CreateObjMetadata("default", "foo",
schema.GroupKind{Group: "apps", Kind: "Deployment"})
applyFailInfo := resourceInfo{
group: "apps",
apiVersion: "apps/v1",
kind: "Deployment",
name: "failure",
namespace: "default",
uid: types.UID("my-uid"),
generation: int64(42),
}
applyFailID, _ := object.CreateObjMetadata("default", "failure",
schema.GroupKind{Group: "apps", Kind: "Deployment"})
testCases := map[string]struct {
applied []resourceInfo
prevInventory []object.ObjMetadata
expected []object.ObjMetadata
err error
}{
"not found error with successful apply is in final inventory": {
applied: []resourceInfo{resInfo},
prevInventory: []object.ObjMetadata{},
expected: []object.ObjMetadata{resID},
err: apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "pod"}, "fake"),
},
"unknown error, but in previous inventory: object is in final inventory": {
applied: []resourceInfo{resInfo},
prevInventory: []object.ObjMetadata{resID},
expected: []object.ObjMetadata{resID},
err: apierrors.NewUnauthorized("not authorized"),
},
"unknown error, not in previous inventory: object is NOT in final inventory": {
applied: []resourceInfo{resInfo},
prevInventory: []object.ObjMetadata{},
expected: []object.ObjMetadata{},
err: apierrors.NewUnauthorized("not authorized"),
},
"apply failure, in previous inventory: object is in final inventory": {
applied: []resourceInfo{applyFailInfo},
prevInventory: []object.ObjMetadata{applyFailID},
expected: []object.ObjMetadata{applyFailID},
err: nil,
},
"apply failure, not in previous inventory: object is NOT in final inventory": {
applied: []resourceInfo{applyFailInfo},
prevInventory: []object.ObjMetadata{},
expected: []object.ObjMetadata{},
err: nil,
},
}
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
eventChannel := make(chan event.Event)
taskContext := taskrunner.NewTaskContext(eventChannel)
objs := toUnstructureds(tc.applied)
oldAO := applyOptionsFactoryFunc
applyOptionsFactoryFunc = func(chan event.Event, common.ServerSideOptions, common.DryRunStrategy, util.Factory) (applyOptions, dynamic.Interface, error) {
return &fakeApplyOptions{}, nil, nil
}
defer func() { applyOptionsFactoryFunc = oldAO }()
restMapper := testutil.NewFakeRESTMapper(schema.GroupVersionKind{
Group: "apps",
Version: "v1",
Kind: "Deployment",
})
prevInv := map[object.ObjMetadata]bool{}
for _, id := range tc.prevInventory {
prevInv[id] = true
}
applyTask := &ApplyTask{
Objects: objs,
PrevInventory: prevInv,
Mapper: restMapper,
InfoHelper: &fakeInfoHelper{},
InvInfo: &fakeInventoryInfo{},
}
getClusterObj = func(d dynamic.Interface, info *resource.Info) (*unstructured.Unstructured, error) {
return objs[0], nil
}
if tc.err != nil {
getClusterObj = func(d dynamic.Interface, info *resource.Info) (*unstructured.Unstructured, error) {
return nil, tc.err
}
}
var events []event.Event
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for msg := range eventChannel {
events = append(events, msg)
}
}()
applyTask.Start(taskContext)
<-taskContext.TaskChannel()
close(eventChannel)
wg.Wait()
// The applied resources should be stored in the TaskContext
// for the final inventory.
actual := taskContext.AppliedResources()
if !object.SetEquals(tc.expected, actual) {
t.Errorf("expected (%s) inventory resources, got (%s)", tc.expected, actual)
}
})
}
}
func TestApplyTask_FetchGeneration(t *testing.T) {
testCases := map[string]struct {
rss []resourceInfo

View File

@ -4,6 +4,8 @@
package taskrunner
import (
"strings"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/cli-utils/pkg/apply/event"
@ -60,12 +62,25 @@ func (tc *TaskContext) ResourceUID(id object.ObjMetadata) (types.UID, bool) {
return ai.uid, true
}
// AppliedResources returns all the objects (as ObjMetadata) that
// were added as applied resources to the TaskContext.
func (tc *TaskContext) AppliedResources() []object.ObjMetadata {
all := make([]object.ObjMetadata, 0, len(tc.appliedResources))
for r := range tc.appliedResources {
all = append(all, r)
}
return all
}
// AllResourceUIDs returns a set with the UIDs of all the resources in the
// context.
func (tc *TaskContext) AllResourceUIDs() sets.String {
uids := sets.NewString()
for _, ai := range tc.appliedResources {
uids.Insert(string(ai.uid))
uid := strings.TrimSpace(string(ai.uid))
if uid != "" {
uids.Insert(uid)
}
}
return uids
}