Merge pull request #358 from seans3/inv-task-refactor

Move inventory operations into tasks
This commit is contained in:
Kubernetes Prow Robot 2021-05-26 22:36:23 -07:00 committed by GitHub
commit d6968048dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 532 additions and 187 deletions

2
go.sum
View File

@ -869,8 +869,6 @@ sigs.k8s.io/controller-runtime v0.6.0 h1:Fzna3DY7c4BIP6KwfSlrfnj20DJ+SeMBK8HSFvO
sigs.k8s.io/controller-runtime v0.6.0/go.mod h1:CpYf5pdNY/B352A1TFLAS2JVSlnGQ5O2cftPHndTroo=
sigs.k8s.io/kustomize v2.0.3+incompatible h1:JUufWFNlI44MdtnjUqVnvh29rR37PQFzPbLXqhyOyX0=
sigs.k8s.io/kustomize v2.0.3+incompatible/go.mod h1:MkjgH3RdOWrievjo6c9T245dYlB5QeXV4WCbnt/PEpU=
sigs.k8s.io/kustomize/kyaml v0.10.14 h1:8zftx3bT0r2355kE/cZfwCMq9SlTWMBvadwwtl+jcbU=
sigs.k8s.io/kustomize/kyaml v0.10.14/go.mod h1:mlQFagmkm1P+W4lZJbJ/yaxMd8PqMRSC4cPcfUVt5Hg=
sigs.k8s.io/kustomize/kyaml v0.10.16 h1:4rn0PTEK4STOA5kbpz72oGskjpKYlmwru4YRgVQFv+c=
sigs.k8s.io/kustomize/kyaml v0.10.16/go.mod h1:mlQFagmkm1P+W4lZJbJ/yaxMd8PqMRSC4cPcfUVt5Hg=
sigs.k8s.io/structured-merge-diff/v3 v3.0.0-20200116222232-67a7b8c61874/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw=

View File

@ -10,7 +10,6 @@ import (
"time"
"github.com/go-errors/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog"
@ -84,11 +83,7 @@ func (a *Applier) Initialize() error {
return nil
}
// prepareObjects merges the currently applied objects into the
// set of stored objects in the cluster inventory. In the process, it
// calculates the set of objects to be pruned (pruneIds), and orders the
// resources for the subsequent apply. Returns the sorted resources to
// apply as well as the objects for the prune, or an error if one occurred.
// prepareObjects returns ResourceObjects or an error if one occurred.
func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs []*unstructured.Unstructured) (*ResourceObjects, error) {
klog.V(4).Infof("applier preparing %d objects", len(localObjs))
if localInv == nil {
@ -120,30 +115,15 @@ func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs []*
}
}
// Ensures the namespace exists before applying the inventory object into it.
if invNamespace := inventoryNamespaceInSet(localInv, localObjs); invNamespace != nil {
klog.V(4).Infof("applier prepareObjects applying namespace %s", invNamespace.GetName())
if err := a.invClient.ApplyInventoryNamespace(invNamespace); err != nil {
return nil, err
}
}
// Retrieve previous inventory objects. Must happen before inventory client merge.
// Retrieve previous inventory objects to calculate prune ids.
prevInv, err := a.invClient.GetClusterObjs(localInv)
if err != nil {
return nil, err
}
klog.V(4).Infof("%d previous inventory objects in cluster", len(prevInv))
locals := object.UnstructuredsToObjMetas(localObjs)
pruneIds := object.SetDiff(prevInv, locals)
klog.V(4).Infof("applier calculated %d prune objects", len(pruneIds))
klog.V(4).Infof("applier merging %d objects into inventory", len(localObjs))
currentObjs := object.UnstructuredsToObjMetas(localObjs)
// returns the objects (pruneIds) to prune after apply. The prune
// algorithm requires stopping if the merge is not successful. Otherwise,
// the stored objects in inventory could become inconsistent.
pruneIds, err := a.invClient.Merge(localInv, currentObjs)
if err != nil {
return nil, err
}
klog.V(4).Infof("after inventory merge; %d objects to prune", len(pruneIds))
// Sort order for applied resources.
sort.Sort(ordering.SortableUnstructureds(localObjs))
@ -244,6 +224,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
Factory: a.provider.Factory(),
InfoHelper: a.infoHelper,
Mapper: mapper,
InvClient: a.invClient,
}).BuildTaskQueue(resourceObjects, solver.Options{
ServerSideOptions: options.ServerSideOptions,
ReconcileTimeout: options.ReconcileTimeout,
@ -337,23 +318,3 @@ func handleError(eventChannel chan event.Event, err error) {
},
}
}
// inventoryNamespaceInSet returns the the namespace the passed inventory
// object will be applied to, or nil if this namespace object does not exist
// in the passed slice "infos" or the inventory object is cluster-scoped.
func inventoryNamespaceInSet(inv inventory.InventoryInfo, objs []*unstructured.Unstructured) *unstructured.Unstructured {
if inv == nil {
return nil
}
invNamespace := inv.Namespace()
for _, obj := range objs {
acc, _ := meta.Accessor(obj)
gvk := obj.GetObjectKind().GroupVersionKind()
if gvk == object.CoreV1Namespace && acc.GetName() == invNamespace {
inventory.AddInventoryIDAnnotation(obj, inv)
return obj
}
}
return nil
}

View File

@ -689,17 +689,6 @@ var obj2 = &unstructured.Unstructured{
},
}
var obj3 = &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": "obj3",
"namespace": "different-namespace",
},
},
}
var clusterScopedObj = &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "rbac.authorization.k8s.io/v1",
@ -710,63 +699,6 @@ var clusterScopedObj = &unstructured.Unstructured{
},
}
func createNamespace(ns string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Namespace",
"metadata": map[string]interface{}{
"name": ns,
},
},
}
}
func TestInventoryNamespaceInSet(t *testing.T) {
inventoryNamespace := createNamespace(namespace)
tests := map[string]struct {
inv inventory.InventoryInfo
objects []*unstructured.Unstructured
namespace *unstructured.Unstructured
}{
"Nil inventory object, no resources returns nil namespace": {
inv: nil,
objects: []*unstructured.Unstructured{},
namespace: nil,
},
"Inventory object, but no resources returns nil namespace": {
inv: localInv,
objects: []*unstructured.Unstructured{},
namespace: nil,
},
"Inventory object, resources with no namespace returns nil namespace": {
inv: localInv,
objects: []*unstructured.Unstructured{obj1, obj2},
namespace: nil,
},
"Inventory object, different namespace returns nil namespace": {
inv: localInv,
objects: []*unstructured.Unstructured{createNamespace("foo")},
namespace: nil,
},
"Inventory object, inventory namespace returns inventory namespace": {
inv: localInv,
objects: []*unstructured.Unstructured{obj1, inventoryNamespace, obj3},
namespace: inventoryNamespace,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
actualNamespace := inventoryNamespaceInSet(tc.inv, tc.objects)
if tc.namespace != actualNamespace {
t.Fatalf("expected namespace (%v), got (%v)", tc.namespace, actualNamespace)
}
})
}
}
func TestReadAndPrepareObjects(t *testing.T) {
testCases := map[string]struct {
// local inventory input into applier.prepareObjects

View File

@ -1,3 +1,6 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=ActionGroupEventType"; DO NOT EDIT.
package event

View File

@ -1,3 +1,6 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=ApplyEventOperation"; DO NOT EDIT.
package event

View File

@ -1,3 +1,6 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=DeleteEventOperation"; DO NOT EDIT.
package event

View File

@ -72,6 +72,7 @@ const (
PruneAction
DeleteAction
WaitAction
InventoryAction
)
type ActionGroup struct {

View File

@ -1,3 +1,6 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=PruneEventOperation"; DO NOT EDIT.
package event

View File

@ -1,3 +1,6 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=ResourceAction"; DO NOT EDIT.
package event
@ -12,11 +15,12 @@ func _() {
_ = x[PruneAction-1]
_ = x[DeleteAction-2]
_ = x[WaitAction-3]
_ = x[InventoryAction-4]
}
const _ResourceAction_name = "ApplyActionPruneActionDeleteActionWaitAction"
const _ResourceAction_name = "ApplyActionPruneActionDeleteActionWaitActionInventoryAction"
var _ResourceAction_index = [...]uint8{0, 11, 22, 34, 44}
var _ResourceAction_index = [...]uint8{0, 11, 22, 34, 44, 59}
func (i ResourceAction) String() string {
if i < 0 || i >= ResourceAction(len(_ResourceAction_index)-1) {

View File

@ -1,3 +1,6 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=Type"; DO NOT EDIT.
package event

View File

@ -123,8 +123,6 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo,
// Sort the resources in reverse order using the same rules as is
// used for apply.
sort.Sort(sort.Reverse(ordering.SortableMetas(pruneObjs)))
// Store prune failures to ensure they remain in the inventory.
pruneFailures := []object.ObjMetadata{}
for _, pruneObj := range pruneObjs {
klog.V(5).Infof("attempting prune: %s", pruneObj)
obj, err := po.getObject(pruneObj)
@ -139,9 +137,8 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo,
klog.Errorf("prune obj (%s/%s) UID retrival error: %s",
pruneObj.Namespace, pruneObj.Name, err)
}
pruneFailures = append(pruneFailures, pruneObj)
taskContext.EventChannel() <- createPruneFailedEvent(pruneObj, err)
taskContext.CaptureResourceFailure(pruneObj)
taskContext.CapturePruneFailure(pruneObj)
continue
}
// Do not prune objects that are in set of currently applied objects.
@ -154,7 +151,7 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo,
if !canPrune(localInv, obj, o.InventoryPolicy, uid) {
klog.V(4).Infof("skip prune for lifecycle directive %s/%s", pruneObj.Namespace, pruneObj.Name)
taskContext.EventChannel() <- createPruneEvent(pruneObj, obj, event.PruneSkipped)
pruneFailures = append(pruneFailures, pruneObj)
taskContext.CapturePruneFailure(pruneObj)
continue
}
// If regular pruning (not destroying), skip deleting namespace containing
@ -164,8 +161,7 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo,
localNamespaces.Has(pruneObj.Name) {
klog.V(4).Infof("skip pruning namespace: %s", pruneObj.Name)
taskContext.EventChannel() <- createPruneEvent(pruneObj, obj, event.PruneSkipped)
pruneFailures = append(pruneFailures, pruneObj)
taskContext.CaptureResourceFailure(pruneObj)
taskContext.CapturePruneFailure(pruneObj)
continue
}
}
@ -177,8 +173,7 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo,
klog.Errorf("prune failed for %s/%s (%s)", pruneObj.Namespace, pruneObj.Name, err)
}
taskContext.EventChannel() <- createPruneFailedEvent(pruneObj, err)
pruneFailures = append(pruneFailures, pruneObj)
taskContext.CaptureResourceFailure(pruneObj)
taskContext.CapturePruneFailure(pruneObj)
continue
}
err = namespacedClient.Delete(context.TODO(), pruneObj.Name, metav1.DeleteOptions{})
@ -187,17 +182,13 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo,
klog.Errorf("prune failed for %s/%s (%s)", pruneObj.Namespace, pruneObj.Name, err)
}
taskContext.EventChannel() <- createPruneFailedEvent(pruneObj, err)
pruneFailures = append(pruneFailures, pruneObj)
taskContext.CaptureResourceFailure(pruneObj)
taskContext.CapturePruneFailure(pruneObj)
continue
}
}
taskContext.EventChannel() <- createPruneEvent(pruneObj, obj, event.Pruned)
}
// Final inventory equals applied objects and prune failures.
appliedResources := taskContext.AppliedResources()
finalInventory := append(appliedResources, pruneFailures...)
return po.InvClient.Replace(localInv, finalInventory)
return nil
}
func (po *PruneOptions) namespacedClient(obj object.ObjMetadata) (dynamic.ResourceInterface, error) {

View File

@ -188,69 +188,58 @@ func TestPrune(t *testing.T) {
tests := map[string]struct {
// pastObjs/currentObjs do NOT contain the inventory object.
// Inventory object is generated from these past/current objects.
pastObjs []*unstructured.Unstructured
currentObjs []*unstructured.Unstructured
prunedObjs []*unstructured.Unstructured
// finalClusterObjs are the objects in cluster at the end of prune,
// and the objects which should be stored in the inventory object.
finalClusterObjs []*unstructured.Unstructured
pruneEventObjs []*unstructured.Unstructured
pastObjs []*unstructured.Unstructured
currentObjs []*unstructured.Unstructured
prunedObjs []*unstructured.Unstructured
pruneEventObjs []*unstructured.Unstructured
}{
"Past and current objects are empty; no pruned objects": {
pastObjs: []*unstructured.Unstructured{},
currentObjs: []*unstructured.Unstructured{},
prunedObjs: []*unstructured.Unstructured{},
finalClusterObjs: []*unstructured.Unstructured{},
pruneEventObjs: []*unstructured.Unstructured{},
pastObjs: []*unstructured.Unstructured{},
currentObjs: []*unstructured.Unstructured{},
prunedObjs: []*unstructured.Unstructured{},
pruneEventObjs: []*unstructured.Unstructured{},
},
"Past and current objects are the same; no pruned objects": {
pastObjs: []*unstructured.Unstructured{namespace, pdb},
currentObjs: []*unstructured.Unstructured{pdb, namespace},
prunedObjs: []*unstructured.Unstructured{},
finalClusterObjs: []*unstructured.Unstructured{namespace, pdb},
pruneEventObjs: []*unstructured.Unstructured{},
pastObjs: []*unstructured.Unstructured{namespace, pdb},
currentObjs: []*unstructured.Unstructured{pdb, namespace},
prunedObjs: []*unstructured.Unstructured{},
pruneEventObjs: []*unstructured.Unstructured{},
},
"No past objects; no pruned objects": {
pastObjs: []*unstructured.Unstructured{},
currentObjs: []*unstructured.Unstructured{pdb, namespace},
pruneEventObjs: []*unstructured.Unstructured{},
finalClusterObjs: []*unstructured.Unstructured{pdb, namespace},
prunedObjs: []*unstructured.Unstructured{},
pastObjs: []*unstructured.Unstructured{},
currentObjs: []*unstructured.Unstructured{pdb, namespace},
pruneEventObjs: []*unstructured.Unstructured{},
prunedObjs: []*unstructured.Unstructured{},
},
"No current objects; all previous objects pruned in correct order": {
pastObjs: []*unstructured.Unstructured{pdb, role, pod},
currentObjs: []*unstructured.Unstructured{},
prunedObjs: []*unstructured.Unstructured{pod, pdb, role},
finalClusterObjs: []*unstructured.Unstructured{},
pruneEventObjs: []*unstructured.Unstructured{pod, pdb, role},
pastObjs: []*unstructured.Unstructured{pdb, role, pod},
currentObjs: []*unstructured.Unstructured{},
prunedObjs: []*unstructured.Unstructured{pod, pdb, role},
pruneEventObjs: []*unstructured.Unstructured{pod, pdb, role},
},
"Omitted object is pruned": {
pastObjs: []*unstructured.Unstructured{pdb, role},
currentObjs: []*unstructured.Unstructured{pdb},
prunedObjs: []*unstructured.Unstructured{role},
finalClusterObjs: []*unstructured.Unstructured{pdb},
pruneEventObjs: []*unstructured.Unstructured{role},
pastObjs: []*unstructured.Unstructured{pdb, role},
currentObjs: []*unstructured.Unstructured{pdb},
prunedObjs: []*unstructured.Unstructured{role},
pruneEventObjs: []*unstructured.Unstructured{role},
},
"Prevent delete lifecycle annotation stops pruning": {
pastObjs: []*unstructured.Unstructured{preventDelete, pdb},
currentObjs: []*unstructured.Unstructured{pdb, role},
prunedObjs: []*unstructured.Unstructured{},
finalClusterObjs: []*unstructured.Unstructured{preventDelete, pdb, role},
pruneEventObjs: []*unstructured.Unstructured{preventDelete},
pastObjs: []*unstructured.Unstructured{preventDelete, pdb},
currentObjs: []*unstructured.Unstructured{pdb, role},
prunedObjs: []*unstructured.Unstructured{},
pruneEventObjs: []*unstructured.Unstructured{preventDelete},
},
"Namespace not pruned if objects are still in it": {
pastObjs: []*unstructured.Unstructured{namespace, pdb, pod},
currentObjs: []*unstructured.Unstructured{pod},
prunedObjs: []*unstructured.Unstructured{pdb},
finalClusterObjs: []*unstructured.Unstructured{namespace, pod},
pruneEventObjs: []*unstructured.Unstructured{pdb, namespace},
pastObjs: []*unstructured.Unstructured{namespace, pdb, pod},
currentObjs: []*unstructured.Unstructured{pod},
prunedObjs: []*unstructured.Unstructured{pdb},
pruneEventObjs: []*unstructured.Unstructured{pdb, namespace},
},
"unknown type doesn't emit prune failed event": {
pastObjs: []*unstructured.Unstructured{unknownCR},
currentObjs: []*unstructured.Unstructured{},
prunedObjs: []*unstructured.Unstructured{unknownCR},
finalClusterObjs: []*unstructured.Unstructured{},
pruneEventObjs: []*unstructured.Unstructured{},
pastObjs: []*unstructured.Unstructured{unknownCR},
currentObjs: []*unstructured.Unstructured{},
prunedObjs: []*unstructured.Unstructured{unknownCR},
pruneEventObjs: []*unstructured.Unstructured{},
},
}
for name, tc := range tests {
@ -296,13 +285,6 @@ func TestPrune(t *testing.T) {
t.Fatalf("Unexpected error during Prune(): %#v", err)
}
// Test that the correct inventory objects are stored at the end of the prune.
actualObjs := fakeInvClient.Objs
expectedObjs := object.UnstructuredsToObjMetas(tc.finalClusterObjs)
if !object.SetEquals(expectedObjs, actualObjs) {
t.Errorf("expected inventory objs (%s), got (%s)", expectedObjs, actualObjs)
}
var actualPruneEvents []event.Event
for e := range eventChannel {
actualPruneEvents = append(actualPruneEvents, e)

View File

@ -40,6 +40,7 @@ type TaskQueueSolver struct {
InfoHelper info.InfoHelper
Factory util.Factory
Mapper meta.RESTMapper
InvClient inventory.InventoryClient
}
type TaskQueue struct {
@ -90,6 +91,8 @@ type resourceObjects interface {
// customization of how the task queue are built.
func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects,
o Options) *TaskQueue {
var invAddCounter int
var invReplaceCounter int
var applyCounter int
var pruneCounter int
var waitCounter int
@ -103,6 +106,14 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects,
prevInventory[prevInvObj] = true
}
// First task merge local applied objects to inventory.
tasks = append(tasks, &task.InvAddTask{
TaskName: fmt.Sprintf("inventory-add-%d", invAddCounter),
InvClient: t.InvClient,
InvInfo: ro.Inventory(),
Objects: ro.ObjsForApply(),
})
crdSplitRes, hasCRDs := splitAfterCRDs(remainingInfos)
if hasCRDs {
tasks = append(tasks, &task.ApplyTask{
@ -186,6 +197,13 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects,
}
}
// Final task is to set inventory with objects in cluster.
tasks = append(tasks, &task.InvSetTask{
TaskName: fmt.Sprintf("inventory-replace-%d", invReplaceCounter),
InvClient: t.InvClient,
InvInfo: ro.Inventory(),
})
return &TaskQueue{
tasks: tasks,
}

View File

@ -38,10 +38,12 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
objs: []*unstructured.Unstructured{},
options: Options{},
expectedTasks: []taskrunner.Task{
&task.InvAddTask{TaskName: "inventory-add-0"},
&task.ApplyTask{
TaskName: "apply-0",
Objects: []*unstructured.Unstructured{},
},
&task.InvSetTask{TaskName: "inventory-replace-0"},
},
},
"single resource": {
@ -50,12 +52,14 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
},
options: Options{},
expectedTasks: []taskrunner.Task{
&task.InvAddTask{TaskName: "inventory-add-0"},
&task.ApplyTask{
TaskName: "apply-0",
Objects: []*unstructured.Unstructured{
depInfo,
},
},
&task.InvSetTask{TaskName: "inventory-replace-0"},
},
},
"multiple resources with wait": {
@ -67,6 +71,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
ReconcileTimeout: time.Minute,
},
expectedTasks: []taskrunner.Task{
&task.InvAddTask{TaskName: "inventory-add-0"},
&task.ApplyTask{
TaskName: "apply-0",
Objects: []*unstructured.Unstructured{
@ -82,6 +87,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
},
taskrunner.AllCurrent, 1*time.Second,
testutil.NewFakeRESTMapper()),
&task.InvSetTask{TaskName: "inventory-replace-0"},
},
},
"multiple resources with wait and prune": {
@ -94,6 +100,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
Prune: true,
},
expectedTasks: []taskrunner.Task{
&task.InvAddTask{TaskName: "inventory-add-0"},
&task.ApplyTask{
TaskName: "apply-0",
Objects: []*unstructured.Unstructured{
@ -112,6 +119,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
&task.PruneTask{
TaskName: "prune-0",
},
&task.InvSetTask{TaskName: "inventory-replace-0"},
},
},
"multiple resources with wait, prune and dryrun": {
@ -125,6 +133,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
DryRunStrategy: common.DryRunClient,
},
expectedTasks: []taskrunner.Task{
&task.InvAddTask{TaskName: "inventory-add-0"},
&task.ApplyTask{
TaskName: "apply-0",
Objects: []*unstructured.Unstructured{
@ -135,6 +144,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
&task.PruneTask{
TaskName: "prune-0",
},
&task.InvSetTask{TaskName: "inventory-replace-0"},
},
},
"multiple resources with wait, prune and server-dryrun": {
@ -148,6 +158,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
DryRunStrategy: common.DryRunServer,
},
expectedTasks: []taskrunner.Task{
&task.InvAddTask{TaskName: "inventory-add-0"},
&task.ApplyTask{
TaskName: "apply-0",
Objects: []*unstructured.Unstructured{
@ -158,6 +169,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
&task.PruneTask{
TaskName: "prune-0",
},
&task.InvSetTask{TaskName: "inventory-replace-0"},
},
},
"multiple resources including CRD": {
@ -169,6 +181,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
ReconcileTimeout: time.Minute,
},
expectedTasks: []taskrunner.Task{
&task.InvAddTask{TaskName: "inventory-add-0"},
&task.ApplyTask{
TaskName: "apply-0",
Objects: []*unstructured.Unstructured{
@ -195,6 +208,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
},
taskrunner.AllCurrent, 1*time.Second,
testutil.NewFakeRESTMapper()),
&task.InvSetTask{TaskName: "inventory-replace-0"},
},
},
"no wait with CRDs if it is a dryrun": {
@ -207,6 +221,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
DryRunStrategy: common.DryRunClient,
},
expectedTasks: []taskrunner.Task{
&task.InvAddTask{TaskName: "inventory-add-0"},
&task.ApplyTask{
TaskName: "apply-0",
Objects: []*unstructured.Unstructured{
@ -219,6 +234,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) {
depInfo,
},
},
&task.InvSetTask{TaskName: "inventory-replace-0"},
},
},
}

View File

@ -0,0 +1,82 @@
// Copyright 2021 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package task
import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/object"
)
// InvAddTask encapsulates structures necessary to add/merge inventory
// into the cluster. The InvAddTask should add/merge inventory references
// before the actual object is applied.
type InvAddTask struct {
TaskName string
InvClient inventory.InventoryClient
InvInfo inventory.InventoryInfo
Objects []*unstructured.Unstructured
}
func (i *InvAddTask) Name() string {
return i.TaskName
}
func (i *InvAddTask) Action() event.ResourceAction {
return event.InventoryAction
}
func (i *InvAddTask) Identifiers() []object.ObjMetadata {
return object.UnstructuredsToObjMetas(i.Objects)
}
// Start updates the inventory by merging the locally applied objects
// into the current inventory.
func (i *InvAddTask) Start(taskContext *taskrunner.TaskContext) {
go func() {
if err := inventory.ValidateNoInventory(i.Objects); err != nil {
taskContext.TaskChannel() <- taskrunner.TaskResult{Err: err}
return
}
// Ensures the namespace exists before applying the inventory object into it.
if invNamespace := inventoryNamespaceInSet(i.InvInfo, i.Objects); invNamespace != nil {
klog.V(4).Infof("applying inventory namespace %s", invNamespace.GetName())
if err := i.InvClient.ApplyInventoryNamespace(invNamespace); err != nil {
taskContext.TaskChannel() <- taskrunner.TaskResult{Err: err}
return
}
}
klog.V(4).Infof("merging %d local objects into inventory", len(i.Objects))
currentObjs := object.UnstructuredsToObjMetas(i.Objects)
_, err := i.InvClient.Merge(i.InvInfo, currentObjs)
taskContext.TaskChannel() <- taskrunner.TaskResult{Err: err}
}()
}
// ClearTimeout is not supported by the InvAddTask.
func (i *InvAddTask) ClearTimeout() {}
// inventoryNamespaceInSet returns the the namespace the passed inventory
// object will be applied to, or nil if this namespace object does not exist
// in the passed slice "infos" or the inventory object is cluster-scoped.
func inventoryNamespaceInSet(inv inventory.InventoryInfo, objs []*unstructured.Unstructured) *unstructured.Unstructured {
if inv == nil {
return nil
}
invNamespace := inv.Namespace()
for _, obj := range objs {
acc, _ := meta.Accessor(obj)
gvk := obj.GetObjectKind().GroupVersionKind()
if gvk == object.CoreV1Namespace && acc.GetName() == invNamespace {
inventory.AddInventoryIDAnnotation(obj, inv)
return obj
}
}
return nil
}

View File

@ -0,0 +1,193 @@
// Copyright 2021 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package task
import (
"testing"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/object"
)
var namespace = "test-namespace"
var inventoryObj = &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": map[string]interface{}{
"name": "test-inventory-obj",
"namespace": namespace,
"labels": map[string]interface{}{
common.InventoryLabel: "test-app-label",
},
},
},
}
var localInv = inventory.WrapInventoryInfoObj(inventoryObj)
var obj1 = &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]interface{}{
"name": "obj1",
"namespace": namespace,
},
},
}
var obj2 = &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": map[string]interface{}{
"name": "obj2",
"namespace": namespace,
},
},
}
var obj3 = &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": "obj3",
"namespace": "different-namespace",
},
},
}
const taskName = "test-inventory-task"
func TestInvAddTask(t *testing.T) {
id1 := object.UnstructuredToObjMeta(obj1)
id2 := object.UnstructuredToObjMeta(obj2)
id3 := object.UnstructuredToObjMeta(obj3)
tests := map[string]struct {
initialObjs []object.ObjMetadata
applyObjs []*unstructured.Unstructured
expectedObjs []object.ObjMetadata
}{
"no initial inventory and no apply objects; no merged inventory": {
initialObjs: []object.ObjMetadata{},
applyObjs: []*unstructured.Unstructured{},
expectedObjs: []object.ObjMetadata{},
},
"no initial inventory, one apply object; one merged inventory": {
initialObjs: []object.ObjMetadata{},
applyObjs: []*unstructured.Unstructured{obj1},
expectedObjs: []object.ObjMetadata{id1},
},
"one initial inventory, no apply object; one merged inventory": {
initialObjs: []object.ObjMetadata{id2},
applyObjs: []*unstructured.Unstructured{},
expectedObjs: []object.ObjMetadata{id2},
},
"one initial inventory, one apply object; one merged inventory": {
initialObjs: []object.ObjMetadata{id3},
applyObjs: []*unstructured.Unstructured{obj3},
expectedObjs: []object.ObjMetadata{id3},
},
"three initial inventory, two same objects; three merged inventory": {
initialObjs: []object.ObjMetadata{id1, id2, id3},
applyObjs: []*unstructured.Unstructured{obj2, obj3},
expectedObjs: []object.ObjMetadata{id1, id2, id3},
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
client := inventory.NewFakeInventoryClient(tc.initialObjs)
eventChannel := make(chan event.Event)
context := taskrunner.NewTaskContext(eventChannel)
task := InvAddTask{
TaskName: taskName,
InvClient: client,
InvInfo: nil,
Objects: tc.applyObjs,
}
if taskName != task.Name() {
t.Errorf("expected task name (%s), got (%s)", taskName, task.Name())
}
applyIds := object.UnstructuredsToObjMetas(tc.applyObjs)
if !object.SetEquals(applyIds, task.Identifiers()) {
t.Errorf("expected task ids (%s), got (%s)", applyIds, task.Identifiers())
}
task.Start(context)
result := <-context.TaskChannel()
if result.Err != nil {
t.Errorf("unexpected error running InvAddTask: %s", result.Err)
}
actual, _ := client.GetClusterObjs(nil)
if !object.SetEquals(tc.expectedObjs, actual) {
t.Errorf("expected merged inventory (%s), got (%s)", tc.expectedObjs, actual)
}
})
}
}
func TestInventoryNamespaceInSet(t *testing.T) {
inventoryNamespace := createNamespace(namespace)
tests := map[string]struct {
inv inventory.InventoryInfo
objects []*unstructured.Unstructured
namespace *unstructured.Unstructured
}{
"Nil inventory object, no resources returns nil namespace": {
inv: nil,
objects: []*unstructured.Unstructured{},
namespace: nil,
},
"Inventory object, but no resources returns nil namespace": {
inv: localInv,
objects: []*unstructured.Unstructured{},
namespace: nil,
},
"Inventory object, resources with no namespace returns nil namespace": {
inv: localInv,
objects: []*unstructured.Unstructured{obj1, obj2},
namespace: nil,
},
"Inventory object, different namespace returns nil namespace": {
inv: localInv,
objects: []*unstructured.Unstructured{createNamespace("foo")},
namespace: nil,
},
"Inventory object, inventory namespace returns inventory namespace": {
inv: localInv,
objects: []*unstructured.Unstructured{obj1, inventoryNamespace, obj3},
namespace: inventoryNamespace,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
actualNamespace := inventoryNamespaceInSet(tc.inv, tc.objects)
if tc.namespace != actualNamespace {
t.Fatalf("expected namespace (%v), got (%v)", tc.namespace, actualNamespace)
}
})
}
}
func createNamespace(ns string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Namespace",
"metadata": map[string]interface{}{
"name": ns,
},
},
}
}

View File

@ -0,0 +1,51 @@
// Copyright 2021 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package task
import (
"k8s.io/klog"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/object"
)
// InvSetTask encapsulates structures necessary to set the
// inventory references at the end of the apply/prune.
type InvSetTask struct {
TaskName string
InvClient inventory.InventoryClient
InvInfo inventory.InventoryInfo
}
func (i *InvSetTask) Name() string {
return i.TaskName
}
func (i *InvSetTask) Action() event.ResourceAction {
return event.InventoryAction
}
func (i *InvSetTask) Identifiers() []object.ObjMetadata {
return []object.ObjMetadata{}
}
// Start sets the inventory using the resources applied and the
// prunes that failed. This task must run after all the apply
// and prune tasks have completed.
func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) {
go func() {
appliedObjs := taskContext.AppliedResources()
klog.V(4).Infof("set inventory %d applied objects", len(appliedObjs))
pruneFailures := taskContext.PruneFailures()
klog.V(4).Infof("set inventory %d prune failures", len(pruneFailures))
invObjs := object.Union(appliedObjs, pruneFailures)
klog.V(4).Infof("set inventory %d total objects", len(invObjs))
err := i.InvClient.Replace(i.InvInfo, invObjs)
taskContext.TaskChannel() <- taskrunner.TaskResult{Err: err}
}()
}
// ClearTimeout is not supported by the InvSetTask.
func (i *InvSetTask) ClearTimeout() {}

View File

@ -0,0 +1,82 @@
// Copyright 2021 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package task
import (
"testing"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/object"
)
func TestInvSetTask(t *testing.T) {
id1 := object.UnstructuredToObjMeta(obj1)
id2 := object.UnstructuredToObjMeta(obj2)
id3 := object.UnstructuredToObjMeta(obj3)
tests := map[string]struct {
appliedObjs []object.ObjMetadata
pruneFailures []object.ObjMetadata
expectedObjs []object.ObjMetadata
}{
"no apply objs, no prune failures; no inventory": {
appliedObjs: []object.ObjMetadata{},
pruneFailures: []object.ObjMetadata{},
expectedObjs: []object.ObjMetadata{},
},
"one apply objs, no prune failures; one inventory": {
appliedObjs: []object.ObjMetadata{id1},
pruneFailures: []object.ObjMetadata{},
expectedObjs: []object.ObjMetadata{id1},
},
"no apply objs, one prune failures; one inventory": {
appliedObjs: []object.ObjMetadata{},
pruneFailures: []object.ObjMetadata{id1},
expectedObjs: []object.ObjMetadata{id1},
},
"one apply objs, one prune failures; one inventory": {
appliedObjs: []object.ObjMetadata{id3},
pruneFailures: []object.ObjMetadata{id3},
expectedObjs: []object.ObjMetadata{id3},
},
"two apply objs, two prune failures; three inventory": {
appliedObjs: []object.ObjMetadata{id1, id2},
pruneFailures: []object.ObjMetadata{id2, id3},
expectedObjs: []object.ObjMetadata{id1, id2, id3},
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
client := inventory.NewFakeInventoryClient([]object.ObjMetadata{})
eventChannel := make(chan event.Event)
context := taskrunner.NewTaskContext(eventChannel)
task := InvSetTask{
TaskName: taskName,
InvClient: client,
InvInfo: nil,
}
for _, applyObj := range tc.appliedObjs {
context.ResourceApplied(applyObj, "unusued-uid", int64(0))
}
for _, pruneObj := range tc.pruneFailures {
context.CapturePruneFailure(pruneObj)
}
if taskName != task.Name() {
t.Errorf("expected task name (%s), got (%s)", taskName, task.Name())
}
task.Start(context)
result := <-context.TaskChannel()
if result.Err != nil {
t.Errorf("unexpected error running InvAddTask: %s", result.Err)
}
actual, _ := client.GetClusterObjs(nil)
if !object.SetEquals(tc.expectedObjs, actual) {
t.Errorf("expected merged inventory (%s), got (%s)", tc.expectedObjs, actual)
}
})
}
}

View File

@ -19,6 +19,7 @@ func NewTaskContext(eventChannel chan event.Event) *TaskContext {
eventChannel: eventChannel,
appliedResources: make(map[object.ObjMetadata]applyInfo),
failedResources: make(map[object.ObjMetadata]struct{}),
pruneFailures: make(map[object.ObjMetadata]struct{}),
}
}
@ -31,8 +32,11 @@ type TaskContext struct {
appliedResources map[object.ObjMetadata]applyInfo
// failedResources records the IDs of resources that are failed during applying and pruning.
// failedResources records the IDs of resources that are failed during applying.
failedResources map[object.ObjMetadata]struct{}
// pruneFailures records the IDs of resources that are failed during pruning.
pruneFailures map[object.ObjMetadata]struct{}
}
func (tc *TaskContext) TaskChannel() chan TaskResult {
@ -104,6 +108,18 @@ func (tc *TaskContext) CaptureResourceFailure(id object.ObjMetadata) {
tc.failedResources[id] = struct{}{}
}
func (tc *TaskContext) CapturePruneFailure(id object.ObjMetadata) {
tc.pruneFailures[id] = struct{}{}
}
func (tc *TaskContext) PruneFailures() []object.ObjMetadata {
failures := make([]object.ObjMetadata, 0, len(tc.pruneFailures))
for f := range tc.pruneFailures {
failures = append(failures, f)
}
return failures
}
// applyInfo captures information about resources that have been
// applied. This is captured in the TaskContext so other tasks
// running later might use this information.

View File

@ -1,3 +1,6 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=EventType"; DO NOT EDIT.
package event