Merge pull request #531 from karlkfi/karl-taskcontext-status

chore: Add Inventory to TaskContext
This commit is contained in:
Kubernetes Prow Robot 2022-02-03 12:10:58 -08:00 committed by GitHub
commit 3f2bc6abd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1019 additions and 245 deletions

View File

@ -99,6 +99,16 @@ func (p *Pruner) Prune(
for _, obj := range objs {
id := object.UnstructuredToObjMetadata(obj)
klog.V(5).Infof("evaluating prune filters (object: %q)", id)
// UID will change if the object is deleted and re-created.
uid := obj.GetUID()
if uid == "" {
err := object.NotFound([]interface{}{"metadata", "uid"}, "")
taskContext.SendEvent(eventFactory.CreateFailedEvent(id, err))
taskContext.InventoryManager().AddFailedDelete(id)
continue
}
// Check filters to see if we're prevented from pruning/deleting object.
var filtered bool
var reason string
@ -111,7 +121,7 @@ func (p *Pruner) Prune(
klog.Errorf("error during %s, (%s): %v", pruneFilter.Name(), id, err)
}
taskContext.SendEvent(eventFactory.CreateFailedEvent(id, err))
taskContext.AddFailedDelete(id)
taskContext.InventoryManager().AddFailedDelete(id)
break
}
if filtered {
@ -125,7 +135,7 @@ func (p *Pruner) Prune(
klog.Errorf("error removing annotation (object: %q, annotation: %q): %v", id, inventory.OwningInventoryKey, err)
}
taskContext.SendEvent(eventFactory.CreateFailedEvent(id, err))
taskContext.AddFailedDelete(id)
taskContext.InventoryManager().AddFailedDelete(id)
break
} else {
// Inventory annotation was successfully removed from the object.
@ -135,7 +145,7 @@ func (p *Pruner) Prune(
}
}
taskContext.SendEvent(eventFactory.CreateSkippedEvent(obj, reason))
taskContext.AddSkippedDelete(id)
taskContext.InventoryManager().AddSkippedDelete(id)
break
}
}
@ -146,6 +156,11 @@ func (p *Pruner) Prune(
if !opts.DryRunStrategy.ClientOrServerDryRun() {
klog.V(4).Infof("deleting object (object: %q)", id)
err := p.deleteObject(id, metav1.DeleteOptions{
// Only delete the resource if it hasn't already been deleted
// and recreated since the last GET. Otherwise error.
Preconditions: &metav1.Preconditions{
UID: &uid,
},
PropagationPolicy: &opts.PropagationPolicy,
})
if err != nil {
@ -153,10 +168,11 @@ func (p *Pruner) Prune(
klog.Errorf("error deleting object (object: %q): %v", id, err)
}
taskContext.SendEvent(eventFactory.CreateFailedEvent(id, err))
taskContext.AddFailedDelete(id)
taskContext.InventoryManager().AddFailedDelete(id)
continue
}
}
taskContext.InventoryManager().AddSuccessfulDelete(id, obj.GetUID())
taskContext.SendEvent(eventFactory.CreateSuccessEvent(obj))
}
return nil

View File

@ -482,19 +482,21 @@ func TestPrune(t *testing.T) {
err = testutil.VerifyEvents(tc.expectedEvents, actualEvents)
assert.NoError(t, err)
im := taskContext.InventoryManager()
// validate record of failed prunes
for _, id := range tc.expectedFailed {
assert.Truef(t, taskContext.IsFailedDelete(id), "Prune() should mark object as failed: %s", id)
assert.Truef(t, im.IsFailedDelete(id), "Prune() should mark object as failed: %s", id)
}
for _, id := range pruneIds.Diff(tc.expectedFailed) {
assert.Falsef(t, taskContext.IsFailedDelete(id), "Prune() should NOT mark object as failed: %s", id)
assert.Falsef(t, im.IsFailedDelete(id), "Prune() should NOT mark object as failed: %s", id)
}
// validate record of skipped prunes
for _, id := range tc.expectedSkipped {
assert.Truef(t, taskContext.IsSkippedDelete(id), "Prune() should mark object as skipped: %s", id)
assert.Truef(t, im.IsSkippedDelete(id), "Prune() should mark object as skipped: %s", id)
}
for _, id := range pruneIds.Diff(tc.expectedSkipped) {
assert.Falsef(t, taskContext.IsSkippedDelete(id), "Prune() should NOT mark object as skipped: %s", id)
assert.Falsef(t, im.IsSkippedDelete(id), "Prune() should NOT mark object as skipped: %s", id)
}
// validate record of abandoned objects
for _, id := range tc.expectedAbandoned {
@ -561,9 +563,11 @@ func TestPruneDeletionPrevention(t *testing.T) {
}
}
im := taskContext.InventoryManager()
assert.Truef(t, taskContext.IsAbandonedObject(pruneID), "Prune() should mark object as abandoned")
assert.Truef(t, taskContext.IsSkippedDelete(pruneID), "Prune() should mark object as skipped")
assert.Falsef(t, taskContext.IsFailedDelete(pruneID), "Prune() should NOT mark object as failed")
assert.Truef(t, im.IsSkippedDelete(pruneID), "Prune() should mark object as skipped")
assert.Falsef(t, im.IsFailedDelete(pruneID), "Prune() should NOT mark object as failed")
})
}
}

View File

@ -106,7 +106,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
id,
applyerror.NewUnknownTypeError(err),
))
taskContext.AddFailedApply(id)
taskContext.InventoryManager().AddFailedApply(id)
continue
}
@ -122,13 +122,13 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
klog.Errorf("error during %s, (%s): %s", filter.Name(), id, filterErr)
}
taskContext.SendEvent(a.createApplyFailedEvent(id, filterErr))
taskContext.AddFailedApply(id)
taskContext.InventoryManager().AddFailedApply(id)
break
}
if filtered {
klog.V(4).Infof("apply filtered (filter: %q, resource: %q, reason: %q)", filter.Name(), id, reason)
taskContext.SendEvent(a.createApplyEvent(id, event.Unchanged, obj))
taskContext.AddSkippedApply(id)
taskContext.InventoryManager().AddSkippedApply(id)
break
}
}
@ -143,7 +143,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
klog.Errorf("error mutating: %w", err)
}
taskContext.SendEvent(a.createApplyFailedEvent(id, err))
taskContext.AddFailedApply(id)
taskContext.InventoryManager().AddFailedApply(id)
continue
}
@ -168,13 +168,13 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
id,
applyerror.NewApplyRunError(err),
))
taskContext.AddFailedApply(id)
taskContext.InventoryManager().AddFailedApply(id)
} else if info.Object != nil {
acc, err := meta.Accessor(info.Object)
if err == nil {
uid := acc.GetUID()
gen := acc.GetGeneration()
taskContext.AddSuccessfulApply(id, uid, gen)
taskContext.InventoryManager().AddSuccessfulApply(id, uid, gen)
}
}
}

View File

@ -114,14 +114,16 @@ func TestApplyTask_BasicAppliedObjects(t *testing.T) {
// The applied resources should be stored in the TaskContext
// for the final inventory.
expectedIDs := object.UnstructuredSetToObjMetadataSet(objs)
actual := taskContext.SuccessfulApplies()
actual := taskContext.InventoryManager().SuccessfulApplies()
if !actual.Equal(expectedIDs) {
t.Errorf("expected (%s) inventory resources, got (%s)", expectedIDs, actual)
}
im := taskContext.InventoryManager()
for _, id := range expectedIDs {
assert.Falsef(t, taskContext.IsFailedApply(id), "ApplyTask should NOT mark object as failed: %s", id)
assert.Falsef(t, taskContext.IsSkippedApply(id), "ApplyTask should NOT mark object as skipped: %s", id)
assert.Falsef(t, im.IsFailedApply(id), "ApplyTask should NOT mark object as failed: %s", id)
assert.Falsef(t, im.IsSkippedApply(id), "ApplyTask should NOT mark object as skipped: %s", id)
}
})
}
@ -198,10 +200,10 @@ func TestApplyTask_FetchGeneration(t *testing.T) {
Name: info.name,
Namespace: info.namespace,
}
uid, _ := taskContext.AppliedResourceUID(id)
uid, _ := taskContext.InventoryManager().AppliedResourceUID(id)
assert.Equal(t, info.uid, uid)
gen, _ := taskContext.AppliedGeneration(id)
gen, _ := taskContext.InventoryManager().AppliedGeneration(id)
assert.Equal(t, info.generation, gen)
}
})
@ -491,19 +493,21 @@ func TestApplyTaskWithError(t *testing.T) {
applyIds := object.UnstructuredSetToObjMetadataSet(tc.objs)
im := taskContext.InventoryManager()
// validate record of failed prunes
for _, id := range tc.expectedFailed {
assert.Truef(t, taskContext.IsFailedApply(id), "ApplyTask should mark object as failed: %s", id)
assert.Truef(t, im.IsFailedApply(id), "ApplyTask should mark object as failed: %s", id)
}
for _, id := range applyIds.Diff(tc.expectedFailed) {
assert.Falsef(t, taskContext.IsFailedApply(id), "ApplyTask should NOT mark object as failed: %s", id)
assert.Falsef(t, im.IsFailedApply(id), "ApplyTask should NOT mark object as failed: %s", id)
}
// validate record of skipped prunes
for _, id := range tc.expectedSkipped {
assert.Truef(t, taskContext.IsSkippedApply(id), "ApplyTask should mark object as skipped: %s", id)
assert.Truef(t, im.IsSkippedApply(id), "ApplyTask should mark object as skipped: %s", id)
}
for _, id := range applyIds.Diff(tc.expectedSkipped) {
assert.Falsef(t, taskContext.IsSkippedApply(id), "ApplyTask should NOT mark object as skipped: %s", id)
assert.Falsef(t, im.IsSkippedApply(id), "ApplyTask should NOT mark object as skipped: %s", id)
}
})
}

View File

@ -59,8 +59,11 @@ func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) {
klog.V(2).Infof("inventory set task starting (name: %q)", i.Name())
invObjs := object.ObjMetadataSet{}
// TODO: Just use InventoryManager.Store()
im := taskContext.InventoryManager()
// If an object applied successfully, keep or add it to the inventory.
appliedObjs := taskContext.SuccessfulApplies()
appliedObjs := im.SuccessfulApplies()
klog.V(4).Infof("set inventory %d successful applies", len(appliedObjs))
invObjs = invObjs.Union(appliedObjs)
@ -69,7 +72,7 @@ func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) {
// This will remove new resources that failed to apply from the inventory,
// because even tho they were added by InvAddTask, the PrevInventory
// represents the inventory before the pipeline has run.
applyFailures := i.PrevInventory.Intersection(taskContext.FailedApplies())
applyFailures := i.PrevInventory.Intersection(im.FailedApplies())
klog.V(4).Infof("keep in inventory %d failed applies", len(applyFailures))
invObjs = invObjs.Union(applyFailures)
@ -78,7 +81,7 @@ func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) {
// It's likely that all the skipped applies are already in the inventory,
// because the apply filters all currently depend on cluster state,
// but we're doing the intersection anyway just to be sure.
applySkips := i.PrevInventory.Intersection(taskContext.SkippedApplies())
applySkips := i.PrevInventory.Intersection(im.SkippedApplies())
klog.V(4).Infof("keep in inventory %d skipped applies", len(applySkips))
invObjs = invObjs.Union(applySkips)
@ -87,7 +90,7 @@ func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) {
// It's likely that all the delete failures are already in the inventory,
// because the set of resources to prune comes from the inventory,
// but we're doing the intersection anyway just to be sure.
pruneFailures := i.PrevInventory.Intersection(taskContext.FailedDeletes())
pruneFailures := i.PrevInventory.Intersection(im.FailedDeletes())
klog.V(4).Infof("set inventory %d failed prunes", len(pruneFailures))
invObjs = invObjs.Union(pruneFailures)
@ -96,7 +99,7 @@ func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) {
// It's likely that all the skipped deletes are already in the inventory,
// because the set of resources to prune comes from the inventory,
// but we're doing the intersection anyway just to be sure.
pruneSkips := i.PrevInventory.Intersection(taskContext.SkippedDeletes())
pruneSkips := i.PrevInventory.Intersection(im.SkippedDeletes())
klog.V(4).Infof("keep in inventory %d skipped prunes", len(pruneSkips))
invObjs = invObjs.Union(pruneSkips)

View File

@ -174,20 +174,21 @@ func TestInvSetTask(t *testing.T) {
InvInfo: nil,
PrevInventory: tc.prevInventory,
}
im := context.InventoryManager()
for _, applyObj := range tc.appliedObjs {
context.AddSuccessfulApply(applyObj, "unusued-uid", int64(0))
im.AddSuccessfulApply(applyObj, "unusued-uid", int64(0))
}
for _, applyFailure := range tc.failedApplies {
context.AddFailedApply(applyFailure)
im.AddFailedApply(applyFailure)
}
for _, pruneObj := range tc.failedDeletes {
context.AddFailedDelete(pruneObj)
im.AddFailedDelete(pruneObj)
}
for _, skippedApply := range tc.skippedApplies {
context.AddSkippedApply(skippedApply)
im.AddSkippedApply(skippedApply)
}
for _, skippedDelete := range tc.skippedDeletes {
context.AddSkippedDelete(skippedDelete)
im.AddSkippedDelete(skippedDelete)
}
for _, abandonedObj := range tc.abandonedObjs {
context.AddAbandonedObject(abandonedObj)

View File

@ -57,7 +57,7 @@ func (p *PruneTask) Start(taskContext *taskrunner.TaskContext) {
// Create filter to prevent deletion of currently applied
// objects. Must be done here to wait for applied UIDs.
uidFilter := filter.CurrentUIDFilter{
CurrentUIDs: taskContext.AppliedResourceUIDs(),
CurrentUIDs: taskContext.InventoryManager().AppliedResourceUIDs(),
}
p.Filters = append(p.Filters, uidFilter)
err := p.Pruner.Prune(

View File

@ -59,7 +59,7 @@ func allMatchStatus(taskContext *TaskContext, ids object.ObjMetadataSet, s statu
return false
}
applyGen, _ := taskContext.AppliedGeneration(id) // generation at apply time
applyGen, _ := taskContext.InventoryManager().AppliedGeneration(id) // generation at apply time
cachedGen := int64(0)
if cached.Resource != nil {
cachedGen = cached.Resource.GetGeneration()
@ -81,7 +81,7 @@ func noneMatchStatus(taskContext *TaskContext, ids object.ObjMetadataSet, s stat
return false
}
applyGen, _ := taskContext.AppliedGeneration(id) // generation at apply time
applyGen, _ := taskContext.InventoryManager().AppliedGeneration(id) // generation at apply time
cachedGen := int64(0)
if cached.Resource != nil {
cachedGen = cached.Resource.GetGeneration()

View File

@ -157,7 +157,7 @@ func TestCollector_ConditionMet(t *testing.T) {
if tc.appliedGen != nil {
for id, gen := range tc.appliedGen {
taskContext.AddSuccessfulApply(id, types.UID("unused"), gen)
taskContext.InventoryManager().AddSuccessfulApply(id, types.UID("unused"), gen)
}
}

View File

@ -4,43 +4,34 @@
package taskrunner
import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/object"
)
// NewTaskContext returns a new TaskContext
func NewTaskContext(eventChannel chan event.Event, resourceCache cache.ResourceCache) *TaskContext {
return &TaskContext{
taskChannel: make(chan TaskResult),
eventChannel: eventChannel,
resourceCache: resourceCache,
successfulApplies: make(map[object.ObjMetadata]applyInfo),
failedApplies: make(map[object.ObjMetadata]struct{}),
failedDeletes: make(map[object.ObjMetadata]struct{}),
skippedApplies: make(map[object.ObjMetadata]struct{}),
skippedDeletes: make(map[object.ObjMetadata]struct{}),
abandonedObjects: make(map[object.ObjMetadata]struct{}),
invalidObjects: make(map[object.ObjMetadata]struct{}),
taskChannel: make(chan TaskResult),
eventChannel: eventChannel,
resourceCache: resourceCache,
inventoryManager: inventory.NewManager(),
abandonedObjects: make(map[object.ObjMetadata]struct{}),
invalidObjects: make(map[object.ObjMetadata]struct{}),
}
}
// TaskContext defines a context that is passed between all
// the tasks that is in a taskqueue.
type TaskContext struct {
taskChannel chan TaskResult
eventChannel chan event.Event
resourceCache cache.ResourceCache
successfulApplies map[object.ObjMetadata]applyInfo
failedApplies map[object.ObjMetadata]struct{}
failedDeletes map[object.ObjMetadata]struct{}
skippedApplies map[object.ObjMetadata]struct{}
skippedDeletes map[object.ObjMetadata]struct{}
abandonedObjects map[object.ObjMetadata]struct{}
invalidObjects map[object.ObjMetadata]struct{}
taskChannel chan TaskResult
eventChannel chan event.Event
resourceCache cache.ResourceCache
inventoryManager *inventory.Manager
abandonedObjects map[object.ObjMetadata]struct{}
invalidObjects map[object.ObjMetadata]struct{}
}
func (tc *TaskContext) TaskChannel() chan TaskResult {
@ -55,148 +46,16 @@ func (tc *TaskContext) ResourceCache() cache.ResourceCache {
return tc.resourceCache
}
func (tc *TaskContext) InventoryManager() *inventory.Manager {
return tc.inventoryManager
}
// SendEvent sends an event on the event channel
func (tc *TaskContext) SendEvent(e event.Event) {
klog.V(5).Infof("sending event: %s", e)
tc.eventChannel <- e
}
// IsSuccessfulApply returns true if the object apply was successful
func (tc *TaskContext) IsSuccessfulApply(id object.ObjMetadata) bool {
_, found := tc.successfulApplies[id]
return found
}
// AddSuccessfulApply updates the context with information about the
// resource identified by the provided id. Currently, we keep information
// about the generation of the resource after the apply operation completed.
func (tc *TaskContext) AddSuccessfulApply(id object.ObjMetadata, uid types.UID, gen int64) {
tc.successfulApplies[id] = applyInfo{
generation: gen,
uid: uid,
}
}
// SuccessfulApplies returns all the objects (as ObjMetadata) that
// were added as applied resources to the TaskContext.
func (tc *TaskContext) SuccessfulApplies() object.ObjMetadataSet {
all := make(object.ObjMetadataSet, 0, len(tc.successfulApplies))
for r := range tc.successfulApplies {
all = append(all, r)
}
return all
}
// AppliedResourceUID looks up the UID of the given resource
func (tc *TaskContext) AppliedResourceUID(id object.ObjMetadata) (types.UID, bool) {
ai, found := tc.successfulApplies[id]
if klog.V(4).Enabled() {
if found {
klog.Infof("resource applied UID cache hit (%s): %d", id, ai.uid)
} else {
klog.Infof("resource applied UID cache miss: (%s): %d", id, ai.uid)
}
}
if !found {
return "", false
}
return ai.uid, true
}
// AppliedResourceUIDs returns a set with the UIDs of all the
// successfully applied resources.
func (tc *TaskContext) AppliedResourceUIDs() sets.String {
uids := sets.NewString()
for _, ai := range tc.successfulApplies {
uid := string(ai.uid)
if uid != "" {
uids.Insert(uid)
}
}
return uids
}
// AppliedGeneration looks up the generation of the given resource
// after it was applied.
func (tc *TaskContext) AppliedGeneration(id object.ObjMetadata) (int64, bool) {
ai, found := tc.successfulApplies[id]
if klog.V(4).Enabled() {
if found {
klog.Infof("resource applied generation cache hit (%s): %d", id, ai.generation)
} else {
klog.Infof("resource applied generation cache miss: (%s): %d", id, ai.generation)
}
}
if !found {
return 0, false
}
return ai.generation, true
}
// IsFailedApply returns true if the object failed to apply
func (tc *TaskContext) IsFailedApply(id object.ObjMetadata) bool {
_, found := tc.failedApplies[id]
return found
}
// AddFailedApply registers that the object failed to apply
func (tc *TaskContext) AddFailedApply(id object.ObjMetadata) {
tc.failedApplies[id] = struct{}{}
}
// FailedApplies returns all the objects that failed to apply
func (tc *TaskContext) FailedApplies() object.ObjMetadataSet {
return object.ObjMetadataSetFromMap(tc.failedApplies)
}
// IsFailedDelete returns true if the object failed to delete
func (tc *TaskContext) IsFailedDelete(id object.ObjMetadata) bool {
_, found := tc.failedDeletes[id]
return found
}
// AddFailedDelete registers that the object failed to delete
func (tc *TaskContext) AddFailedDelete(id object.ObjMetadata) {
tc.failedDeletes[id] = struct{}{}
}
// FailedDeletes returns all the objects that failed to delete
func (tc *TaskContext) FailedDeletes() object.ObjMetadataSet {
return object.ObjMetadataSetFromMap(tc.failedDeletes)
}
// IsSkippedApply returns true if the object apply was skipped
func (tc *TaskContext) IsSkippedApply(id object.ObjMetadata) bool {
_, found := tc.skippedApplies[id]
return found
}
// AddSkippedApply registers that the object apply was skipped
func (tc *TaskContext) AddSkippedApply(id object.ObjMetadata) {
tc.skippedApplies[id] = struct{}{}
}
// SkippedApplies returns all the objects where apply was skipped
func (tc *TaskContext) SkippedApplies() object.ObjMetadataSet {
return object.ObjMetadataSetFromMap(tc.skippedApplies)
}
// IsSkippedDelete returns true if the object delete was skipped
func (tc *TaskContext) IsSkippedDelete(id object.ObjMetadata) bool {
_, found := tc.skippedDeletes[id]
return found
}
// AddSkippedDelete registers that the object delete was skipped
func (tc *TaskContext) AddSkippedDelete(id object.ObjMetadata) {
tc.skippedDeletes[id] = struct{}{}
}
// SkippedDeletes returns all the objects where deletion was skipped
func (tc *TaskContext) SkippedDeletes() object.ObjMetadataSet {
return object.ObjMetadataSetFromMap(tc.skippedDeletes)
}
// IsAbandonedObject returns true if the object is abandoned
func (tc *TaskContext) IsAbandonedObject(id object.ObjMetadata) bool {
_, found := tc.abandonedObjects[id]
@ -228,17 +87,3 @@ func (tc *TaskContext) AddInvalidObject(id object.ObjMetadata) {
func (tc *TaskContext) InvalidObjects() object.ObjMetadataSet {
return object.ObjMetadataSetFromMap(tc.invalidObjects)
}
// applyInfo captures information about resources that have been
// applied. This is captured in the TaskContext so other tasks
// running later might use this information.
type applyInfo struct {
// generation captures the "version" of the resource after it
// has been applied. Generation is a monotonically increasing number
// that the APIServer increases every time the desired state of a
// resource changes.
generation int64
// uid captures the uid of the resource that has been applied.
uid types.UID
}

View File

@ -151,10 +151,25 @@ func (w *WaitTask) startInner(taskContext *TaskContext) {
for _, id := range w.Ids {
switch {
case w.skipped(taskContext, id):
err := taskContext.InventoryManager().SetSkippedReconcile(id)
if err != nil {
// Object never applied or deleted!
klog.Errorf("Failed to mark object as skipped reconcile: %v", err)
}
w.sendEvent(taskContext, id, event.ReconcileSkipped)
case w.reconciledByID(taskContext, id):
err := taskContext.InventoryManager().SetSuccessfulReconcile(id)
if err != nil {
// Object never applied or deleted!
klog.Errorf("Failed to mark object as successful reconcile: %v", err)
}
w.sendEvent(taskContext, id, event.Reconciled)
default:
err := taskContext.InventoryManager().SetPendingReconcile(id)
if err != nil {
// Object never applied or deleted!
klog.Errorf("Failed to mark object as pending reconcile: %v", err)
}
pending = append(pending, id)
w.sendEvent(taskContext, id, event.ReconcilePending)
}
@ -175,6 +190,11 @@ func (w *WaitTask) sendTimeoutEvents(taskContext *TaskContext) {
defer w.mu.RUnlock()
for _, id := range w.pending {
err := taskContext.InventoryManager().SetTimeoutReconcile(id)
if err != nil {
// Object never applied or deleted!
klog.Errorf("Failed to mark object as pending reconcile: %v", err)
}
w.sendEvent(taskContext, id, event.ReconcileTimeout)
}
}
@ -188,12 +208,13 @@ func (w *WaitTask) reconciledByID(taskContext *TaskContext, id object.ObjMetadat
// skipped returns true if the object failed or was skipped by a preceding
// apply/delete/prune task.
func (w *WaitTask) skipped(taskContext *TaskContext, id object.ObjMetadata) bool {
im := taskContext.InventoryManager()
if w.Condition == AllCurrent &&
taskContext.IsFailedApply(id) || taskContext.IsSkippedApply(id) {
im.IsFailedApply(id) || im.IsSkippedApply(id) {
return true
}
if w.Condition == AllNotFound &&
taskContext.IsFailedDelete(id) || taskContext.IsSkippedDelete(id) {
im.IsFailedDelete(id) || im.IsSkippedDelete(id) {
return true
}
return false
@ -219,7 +240,7 @@ func (w *WaitTask) StatusUpdate(taskContext *TaskContext, id object.ObjMetadata)
if klog.V(5).Enabled() {
status := taskContext.ResourceCache().Get(id).Status
klog.Errorf("status update (object: %q, status: %q)", id, status)
klog.Infof("status update (object: %q, status: %q)", id, status)
}
switch {
@ -228,10 +249,20 @@ func (w *WaitTask) StatusUpdate(taskContext *TaskContext, id object.ObjMetadata)
// pending - check if reconciled
case w.reconciledByID(taskContext, id):
// reconciled - remove from pending & send event
err := taskContext.InventoryManager().SetSuccessfulReconcile(id)
if err != nil {
// Object never applied or deleted!
klog.Errorf("Failed to mark object as successful reconcile: %v", err)
}
w.pending = w.pending.Remove(id)
w.sendEvent(taskContext, id, event.Reconciled)
case w.failedByID(taskContext, id):
// failed - remove from pending & send event
err := taskContext.InventoryManager().SetFailedReconcile(id)
if err != nil {
// Object never applied or deleted!
klog.Errorf("Failed to mark object as failed reconcile: %v", err)
}
w.pending = w.pending.Remove(id)
w.failed = append(w.failed, id)
w.sendEvent(taskContext, id, event.ReconcileFailed)
@ -250,11 +281,22 @@ func (w *WaitTask) StatusUpdate(taskContext *TaskContext, id object.ObjMetadata)
// resources have completed/timed out, we consider it
// current.
if w.reconciledByID(taskContext, id) {
// reconciled - remove from pending & send event
err := taskContext.InventoryManager().SetSuccessfulReconcile(id)
if err != nil {
// Object never applied or deleted!
klog.Errorf("Failed to mark object as successful reconcile: %v", err)
}
w.failed = w.failed.Remove(id)
w.sendEvent(taskContext, id, event.Reconciled)
} else if !w.failedByID(taskContext, id) {
// If a resource is no longer reported as Failed and is not Reconciled,
// they should just go back to InProgress.
err := taskContext.InventoryManager().SetPendingReconcile(id)
if err != nil {
// Object never applied or deleted!
klog.Errorf("Failed to mark object as pending reconcile: %v", err)
}
w.failed = w.failed.Remove(id)
w.pending = append(w.pending, id)
w.sendEvent(taskContext, id, event.ReconcilePending)
@ -267,6 +309,11 @@ func (w *WaitTask) StatusUpdate(taskContext *TaskContext, id object.ObjMetadata)
// reconciled - check if unreconciled
if !w.reconciledByID(taskContext, id) {
// unreconciled - add to pending & send event
err := taskContext.InventoryManager().SetPendingReconcile(id)
if err != nil {
// Object never applied or deleted!
klog.Errorf("Failed to mark object as pending reconcile: %v", err)
}
w.pending = append(w.pending, id)
w.sendEvent(taskContext, id, event.ReconcilePending)
// can't be all reconciled now, so don't bother checking

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/testutil"
@ -87,14 +88,14 @@ func TestWaitTask_CompleteEventually(t *testing.T) {
defer close(eventChannel)
// mark deployment 1 & 2 as applied
taskContext.AddSuccessfulApply(testDeployment1ID, "unused", 1)
taskContext.AddSuccessfulApply(testDeployment2ID, "unused", 1)
taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID, "unused", 1)
taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID, "unused", 1)
// mark deployment 3 as failed
taskContext.AddFailedApply(testDeployment3ID)
taskContext.InventoryManager().AddFailedApply(testDeployment3ID)
// mark deployment 4 as skipped
taskContext.AddSkippedApply(testDeployment4ID)
taskContext.InventoryManager().AddSkippedApply(testDeployment4ID)
// run task async, to let the test collect events
go func() {
@ -205,6 +206,42 @@ loop:
testutil.AssertEqual(t, expectedEvents, receivedEvents,
"Actual events (%d) do not match expected events (%d)",
len(receivedEvents), len(expectedEvents))
expectedInventory := inventory.Inventory{
Status: inventory.InventoryStatus{
Objects: []inventory.ObjectStatus{
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileSucceeded,
UID: "unused",
Generation: 1,
},
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileSucceeded,
UID: "unused",
Generation: 1,
},
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment3ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationFailed,
Reconcile: inventory.ReconcileSkipped,
},
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment4ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSkipped,
Reconcile: inventory.ReconcileSkipped,
},
},
},
}
testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
}
func TestWaitTask_Timeout(t *testing.T) {
@ -230,14 +267,14 @@ func TestWaitTask_Timeout(t *testing.T) {
defer close(eventChannel)
// mark deployment 1 & 2 as applied
taskContext.AddSuccessfulApply(testDeployment1ID, "unused", 1)
taskContext.AddSuccessfulApply(testDeployment2ID, "unused", 1)
taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID, "unused", 1)
taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID, "unused", 1)
// mark deployment 3 as failed
taskContext.AddFailedApply(testDeployment3ID)
taskContext.InventoryManager().AddFailedApply(testDeployment3ID)
// mark deployment 4 as skipped
taskContext.AddSkippedApply(testDeployment4ID)
taskContext.InventoryManager().AddSkippedApply(testDeployment4ID)
// run task async, to let the test collect events
go func() {
@ -332,6 +369,42 @@ loop:
testutil.AssertEqual(t, expectedEvents, receivedEvents,
"Actual events (%d) do not match expected events (%d)",
len(receivedEvents), len(expectedEvents))
expectedInventory := inventory.Inventory{
Status: inventory.InventoryStatus{
Objects: []inventory.ObjectStatus{
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileSucceeded,
UID: "unused",
Generation: 1,
},
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileTimeout,
UID: "unused",
Generation: 1,
},
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment3ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationFailed,
Reconcile: inventory.ReconcileSkipped,
},
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment4ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSkipped,
Reconcile: inventory.ReconcileSkipped,
},
},
},
}
testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
}
func TestWaitTask_StartAndComplete(t *testing.T) {
@ -351,8 +424,7 @@ func TestWaitTask_StartAndComplete(t *testing.T) {
defer close(eventChannel)
// mark the deployment as applied
appliedGeneration := int64(1)
taskContext.AddSuccessfulApply(testDeploymentID, "unused", appliedGeneration)
taskContext.InventoryManager().AddSuccessfulApply(testDeploymentID, "unused", 1)
// mark the deployment as Current before starting
resourceCache.Put(testDeploymentID, cache.ResourceStatus{
@ -397,6 +469,22 @@ loop:
testutil.AssertEqual(t, expectedEvents, receivedEvents,
"Actual events (%d) do not match expected events (%d)",
len(receivedEvents), len(expectedEvents))
expectedInventory := inventory.Inventory{
Status: inventory.InventoryStatus{
Objects: []inventory.ObjectStatus{
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeploymentID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileSucceeded,
UID: "unused",
Generation: 1,
},
},
},
}
testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
}
func TestWaitTask_Cancel(t *testing.T) {
@ -414,6 +502,9 @@ func TestWaitTask_Cancel(t *testing.T) {
taskContext := NewTaskContext(eventChannel, resourceCache)
defer close(eventChannel)
// mark the deployment as applied
taskContext.InventoryManager().AddSuccessfulApply(testDeploymentID, "unused", 1)
// run task async, to let the test collect events
go func() {
// start the task
@ -459,6 +550,22 @@ loop:
testutil.AssertEqual(t, expectedEvents, receivedEvents,
"Actual events (%d) do not match expected events (%d)",
len(receivedEvents), len(expectedEvents))
expectedInventory := inventory.Inventory{
Status: inventory.InventoryStatus{
Objects: []inventory.ObjectStatus{
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeploymentID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcilePending,
UID: "unused",
Generation: 1,
},
},
},
}
testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
}
func TestWaitTask_SingleTaskResult(t *testing.T) {
@ -479,8 +586,7 @@ func TestWaitTask_SingleTaskResult(t *testing.T) {
defer close(eventChannel)
// mark the deployment as applied
appliedGeneration := int64(1)
taskContext.AddSuccessfulApply(testDeploymentID, "unused", appliedGeneration)
taskContext.InventoryManager().AddSuccessfulApply(testDeploymentID, "unused", 1)
// run task async, to let the test collect events
go func() {
@ -492,7 +598,7 @@ func TestWaitTask_SingleTaskResult(t *testing.T) {
// mark the deployment as Current
resourceCache.Put(testDeploymentID, cache.ResourceStatus{
Resource: withGeneration(testDeployment, appliedGeneration),
Resource: withGeneration(testDeployment, 1),
Status: status.CurrentStatus,
})
@ -547,6 +653,22 @@ loop:
{}, // Empty result means success
}
assert.Equal(t, expectedResults, receivedResults)
expectedInventory := inventory.Inventory{
Status: inventory.InventoryStatus{
Objects: []inventory.ObjectStatus{
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeploymentID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileSucceeded,
UID: "unused",
Generation: 1,
},
},
},
}
testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
}
func TestWaitTask_Failed(t *testing.T) {
@ -561,11 +683,12 @@ func TestWaitTask_Failed(t *testing.T) {
eventsFunc func(*cache.ResourceCacheMap, *WaitTask, *TaskContext)
waitTimeout time.Duration
expectedEvents []event.Event
expectedInventory *inventory.Inventory
}{
"continue on failed if others InProgress": {
configureTaskContextFunc: func(taskContext *TaskContext) {
taskContext.AddSuccessfulApply(testDeployment1ID, "unused", 1)
taskContext.AddSuccessfulApply(testDeployment2ID, "unused", 1)
taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID, "unused", 1)
taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID, "unused", 1)
},
eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
@ -625,11 +748,33 @@ func TestWaitTask_Failed(t *testing.T) {
},
},
},
expectedInventory: &inventory.Inventory{
Status: inventory.InventoryStatus{
Objects: []inventory.ObjectStatus{
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileFailed,
UID: "unused",
Generation: 1,
},
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileSucceeded,
UID: "unused",
Generation: 1,
},
},
},
},
},
"complete wait task is last resource becomes failed": {
configureTaskContextFunc: func(taskContext *TaskContext) {
taskContext.AddSuccessfulApply(testDeployment1ID, "unused", 1)
taskContext.AddSuccessfulApply(testDeployment2ID, "unused", 1)
taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID, "unused", 1)
taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID, "unused", 1)
},
eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
@ -683,11 +828,33 @@ func TestWaitTask_Failed(t *testing.T) {
},
},
},
expectedInventory: &inventory.Inventory{
Status: inventory.InventoryStatus{
Objects: []inventory.ObjectStatus{
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileFailed,
UID: "unused",
Generation: 1,
},
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileSucceeded,
UID: "unused",
Generation: 1,
},
},
},
},
},
"failed resource can become current": {
configureTaskContextFunc: func(taskContext *TaskContext) {
taskContext.AddSuccessfulApply(testDeployment1ID, "unused", 1)
taskContext.AddSuccessfulApply(testDeployment2ID, "unused", 1)
taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID, "unused", 1)
taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID, "unused", 1)
},
eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
@ -756,11 +923,33 @@ func TestWaitTask_Failed(t *testing.T) {
},
},
},
expectedInventory: &inventory.Inventory{
Status: inventory.InventoryStatus{
Objects: []inventory.ObjectStatus{
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileSucceeded,
UID: "unused",
Generation: 1,
},
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileSucceeded,
UID: "unused",
Generation: 1,
},
},
},
},
},
"failed resource can become InProgress": {
configureTaskContextFunc: func(taskContext *TaskContext) {
taskContext.AddSuccessfulApply(testDeployment1ID, "unused", 1)
taskContext.AddSuccessfulApply(testDeployment2ID, "unused", 1)
taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID, "unused", 1)
taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID, "unused", 1)
},
eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
@ -838,6 +1027,28 @@ func TestWaitTask_Failed(t *testing.T) {
},
},
},
expectedInventory: &inventory.Inventory{
Status: inventory.InventoryStatus{
Objects: []inventory.ObjectStatus{
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileTimeout,
UID: "unused",
Generation: 1,
},
{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
Strategy: inventory.ActuationStrategyApply,
Actuation: inventory.ActuationSucceeded,
Reconcile: inventory.ReconcileSucceeded,
UID: "unused",
Generation: 1,
},
},
},
},
},
}
@ -885,6 +1096,8 @@ func TestWaitTask_Failed(t *testing.T) {
testutil.AssertEqual(t, tc.expectedEvents, receivedEvents,
"Actual events (%d) do not match expected events (%d)",
len(receivedEvents), len(tc.expectedEvents))
testutil.AssertEqual(t, tc.expectedInventory, taskContext.InventoryManager().Inventory())
})
}
}

View File

@ -0,0 +1,26 @@
// Code generated by "stringer -type=ActuationStatus -linecomment"; DO NOT EDIT.
package inventory
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[ActuationPending-0]
_ = x[ActuationSucceeded-1]
_ = x[ActuationSkipped-2]
_ = x[ActuationFailed-3]
}
const _ActuationStatus_name = "PendingSucceededSkippedFailed"
var _ActuationStatus_index = [...]uint8{0, 7, 16, 23, 29}
func (i ActuationStatus) String() string {
if i < 0 || i >= ActuationStatus(len(_ActuationStatus_index)-1) {
return "ActuationStatus(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _ActuationStatus_name[_ActuationStatus_index[i]:_ActuationStatus_index[i+1]]
}

View File

@ -0,0 +1,24 @@
// Code generated by "stringer -type=ActuationStrategy -linecomment"; DO NOT EDIT.
package inventory
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[ActuationStrategyApply-0]
_ = x[ActuationStrategyDelete-1]
}
const _ActuationStrategy_name = "ApplyDelete"
var _ActuationStrategy_index = [...]uint8{0, 5, 11}
func (i ActuationStrategy) String() string {
if i < 0 || i >= ActuationStrategy(len(_ActuationStrategy_index)-1) {
return "ActuationStrategy(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _ActuationStrategy_name[_ActuationStrategy_index[i]:_ActuationStrategy_index[i+1]]
}

View File

@ -49,7 +49,7 @@ type InventoryClient interface {
type ClusterInventoryClient struct {
dc dynamic.Interface
mapper meta.RESTMapper
InventoryFactoryFunc InventoryFactoryFunc
InventoryFactoryFunc StorageFactoryFunc
invToUnstructuredFunc InventoryToUnstructuredFunc
}
@ -58,7 +58,7 @@ var _ InventoryClient = &ClusterInventoryClient{}
// NewInventoryClient returns a concrete implementation of the
// InventoryClient interface or an error.
func NewInventoryClient(factory cmdutil.Factory,
invFunc InventoryFactoryFunc,
invFunc StorageFactoryFunc,
invToUnstructuredFunc InventoryToUnstructuredFunc) (*ClusterInventoryClient, error) {
dc, err := factory.DynamicClient()
if err != nil {

View File

@ -24,10 +24,10 @@ import (
// The default inventory name stored in the inventory template.
const legacyInvName = "inventory"
// Inventory describes methods necessary for an object which
// Storage describes methods necessary for an object which
// can persist the object metadata for pruning and other group
// operations.
type Inventory interface {
type Storage interface {
// Load retrieves the set of object metadata from the inventory object
Load() (object.ObjMetadataSet, error)
// Store the set of object metadata in the inventory object
@ -36,9 +36,9 @@ type Inventory interface {
GetObject() (*unstructured.Unstructured, error)
}
// InventoryFactoryFunc creates the object which implements the Inventory
// StorageFactoryFunc creates the object which implements the Inventory
// interface from the passed info object.
type InventoryFactoryFunc func(*unstructured.Unstructured) Inventory
type StorageFactoryFunc func(*unstructured.Unstructured) Storage
// InventoryToUnstructuredFunc returns the unstructured object for the
// given InventoryInfo.

View File

@ -19,7 +19,7 @@ import (
// WrapInventoryObj takes a passed ConfigMap (as a resource.Info),
// wraps it with the InventoryConfigMap and upcasts the wrapper as
// an the Inventory interface.
func WrapInventoryObj(inv *unstructured.Unstructured) Inventory {
func WrapInventoryObj(inv *unstructured.Unstructured) Storage {
return &InventoryConfigMap{inv: inv}
}
@ -47,7 +47,7 @@ type InventoryConfigMap struct {
}
var _ InventoryInfo = &InventoryConfigMap{}
var _ Inventory = &InventoryConfigMap{}
var _ Storage = &InventoryConfigMap{}
func (icm *InventoryConfigMap) Name() string {
return icm.inv.GetName()

391
pkg/inventory/manager.go Normal file
View File

@ -0,0 +1,391 @@
// Copyright 2021 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package inventory
import (
"fmt"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/cli-utils/pkg/object"
)
// Manager wraps an Inventory with convenience methods that use ObjMetadata.
type Manager struct {
inventory *Inventory
}
// NewManager returns a new manager instance.
func NewManager() *Manager {
return &Manager{
inventory: &Inventory{},
}
}
// Inventory returns the in-memory version of the managed inventory.
func (tc *Manager) Inventory() *Inventory {
return tc.inventory
}
// ObjectStatus retrieves the status of an object with the specified ID.
// The returned status is a pointer and can be updated in-place for efficiency.
func (tc *Manager) ObjectStatus(id object.ObjMetadata) (*ObjectStatus, bool) {
for i, objStatus := range tc.inventory.Status.Objects {
if ObjMetadataEqualObjectReference(id, objStatus.ObjectReference) {
return &(tc.inventory.Status.Objects[i]), true
}
}
return nil, false
}
// ObjectsWithActuationStatus retrieves the set of objects with the
// specified actuation strategy and status.
func (tc *Manager) ObjectsWithActuationStatus(strategy ActuationStrategy, status ActuationStatus) object.ObjMetadataSet {
var ids object.ObjMetadataSet
for _, objStatus := range tc.inventory.Status.Objects {
if objStatus.Strategy == strategy && objStatus.Actuation == status {
ids = append(ids, ObjMetadataFromObjectReference(objStatus.ObjectReference))
}
}
return ids
}
// ObjectsWithActuationStatus retrieves the set of objects with the
// specified reconcile status, regardless of actuation strategy.
func (tc *Manager) ObjectsWithReconcileStatus(status ReconcileStatus) object.ObjMetadataSet {
var ids object.ObjMetadataSet
for _, objStatus := range tc.inventory.Status.Objects {
if objStatus.Reconcile == status {
ids = append(ids, ObjMetadataFromObjectReference(objStatus.ObjectReference))
}
}
return ids
}
// SetObjectStatus updates or adds an ObjectStatus record to the inventory.
func (tc *Manager) SetObjectStatus(id object.ObjMetadata, objStatus ObjectStatus) {
for i, objStatus := range tc.inventory.Status.Objects {
if ObjMetadataEqualObjectReference(id, objStatus.ObjectReference) {
tc.inventory.Status.Objects[i] = objStatus
return
}
}
tc.inventory.Status.Objects = append(tc.inventory.Status.Objects, objStatus)
}
// IsSuccessfulApply returns true if the object apply was successful
func (tc *Manager) IsSuccessfulApply(id object.ObjMetadata) bool {
objStatus, found := tc.ObjectStatus(id)
if !found {
return false
}
return objStatus.Strategy == ActuationStrategyApply &&
objStatus.Actuation == ActuationSucceeded
}
// AddSuccessfulApply updates the context with information about the
// resource identified by the provided id. Currently, we keep information
// about the generation of the resource after the apply operation completed.
func (tc *Manager) AddSuccessfulApply(id object.ObjMetadata, uid types.UID, gen int64) {
tc.SetObjectStatus(id, ObjectStatus{
ObjectReference: ObjectReferenceFromObjMetadata(id),
Strategy: ActuationStrategyApply,
Actuation: ActuationSucceeded,
Reconcile: ReconcilePending,
UID: uid,
Generation: gen,
})
}
// SuccessfulApplies returns all the objects (as ObjMetadata) that
// were added as applied resources to the Manager.
func (tc *Manager) SuccessfulApplies() object.ObjMetadataSet {
return tc.ObjectsWithActuationStatus(ActuationStrategyApply,
ActuationSucceeded)
}
// AppliedResourceUID looks up the UID of a successfully applied resource
func (tc *Manager) AppliedResourceUID(id object.ObjMetadata) (types.UID, bool) {
objStatus, found := tc.ObjectStatus(id)
return objStatus.UID, found &&
objStatus.Strategy == ActuationStrategyApply &&
objStatus.Actuation == ActuationSucceeded
}
// AppliedResourceUIDs returns a set with the UIDs of all the
// successfully applied resources.
func (tc *Manager) AppliedResourceUIDs() sets.String {
uids := sets.NewString()
for _, objStatus := range tc.inventory.Status.Objects {
if objStatus.Strategy == ActuationStrategyApply &&
objStatus.Actuation == ActuationSucceeded {
if objStatus.UID != "" {
uids.Insert(string(objStatus.UID))
}
}
}
return uids
}
// AppliedGeneration looks up the generation of the given resource
// after it was applied.
func (tc *Manager) AppliedGeneration(id object.ObjMetadata) (int64, bool) {
objStatus, found := tc.ObjectStatus(id)
if !found {
return 0, false
}
return objStatus.Generation, true
}
// IsSuccessfulDelete returns true if the object delete was successful
func (tc *Manager) IsSuccessfulDelete(id object.ObjMetadata) bool {
objStatus, found := tc.ObjectStatus(id)
if !found {
return false
}
return objStatus.Strategy == ActuationStrategyDelete &&
objStatus.Actuation == ActuationSucceeded
}
// AddSuccessfulDelete updates the context with information about the
// resource identified by the provided id. Currently, we only track the uid,
// because the DELETE call doesn't always return the generation, namely if the
// object was scheduled to be deleted asynchronously, which might cause further
// updates by finalizers. The UID will change if the object is re-created.
func (tc *Manager) AddSuccessfulDelete(id object.ObjMetadata, uid types.UID) {
tc.SetObjectStatus(id, ObjectStatus{
ObjectReference: ObjectReferenceFromObjMetadata(id),
Strategy: ActuationStrategyDelete,
Actuation: ActuationSucceeded,
Reconcile: ReconcilePending,
UID: uid,
})
}
// SuccessfulDeletes returns all the objects (as ObjMetadata) that
// were successfully deleted.
func (tc *Manager) SuccessfulDeletes() object.ObjMetadataSet {
return tc.ObjectsWithActuationStatus(ActuationStrategyDelete,
ActuationSucceeded)
}
// IsFailedApply returns true if the object failed to apply
func (tc *Manager) IsFailedApply(id object.ObjMetadata) bool {
objStatus, found := tc.ObjectStatus(id)
if !found {
return false
}
return objStatus.Strategy == ActuationStrategyApply &&
objStatus.Actuation == ActuationFailed
}
// AddFailedApply registers that the object failed to apply
func (tc *Manager) AddFailedApply(id object.ObjMetadata) {
tc.SetObjectStatus(id, ObjectStatus{
ObjectReference: ObjectReferenceFromObjMetadata(id),
Strategy: ActuationStrategyApply,
Actuation: ActuationFailed,
Reconcile: ReconcilePending,
})
}
// FailedApplies returns all the objects that failed to apply
func (tc *Manager) FailedApplies() object.ObjMetadataSet {
return tc.ObjectsWithActuationStatus(ActuationStrategyApply, ActuationFailed)
}
// IsFailedDelete returns true if the object failed to delete
func (tc *Manager) IsFailedDelete(id object.ObjMetadata) bool {
objStatus, found := tc.ObjectStatus(id)
if !found {
return false
}
return objStatus.Strategy == ActuationStrategyDelete &&
objStatus.Actuation == ActuationFailed
}
// AddFailedDelete registers that the object failed to delete
func (tc *Manager) AddFailedDelete(id object.ObjMetadata) {
tc.SetObjectStatus(id, ObjectStatus{
ObjectReference: ObjectReferenceFromObjMetadata(id),
Strategy: ActuationStrategyDelete,
Actuation: ActuationFailed,
Reconcile: ReconcilePending,
})
}
// FailedDeletes returns all the objects that failed to delete
func (tc *Manager) FailedDeletes() object.ObjMetadataSet {
return tc.ObjectsWithActuationStatus(ActuationStrategyDelete, ActuationFailed)
}
// IsSkippedApply returns true if the object apply was skipped
func (tc *Manager) IsSkippedApply(id object.ObjMetadata) bool {
objStatus, found := tc.ObjectStatus(id)
if !found {
return false
}
return objStatus.Strategy == ActuationStrategyApply &&
objStatus.Actuation == ActuationSkipped
}
// AddSkippedApply registers that the object apply was skipped
func (tc *Manager) AddSkippedApply(id object.ObjMetadata) {
tc.SetObjectStatus(id, ObjectStatus{
ObjectReference: ObjectReferenceFromObjMetadata(id),
Strategy: ActuationStrategyApply,
Actuation: ActuationSkipped,
Reconcile: ReconcilePending,
})
}
// SkippedApplies returns all the objects where apply was skipped
func (tc *Manager) SkippedApplies() object.ObjMetadataSet {
return tc.ObjectsWithActuationStatus(ActuationStrategyApply, ActuationSkipped)
}
// IsSkippedDelete returns true if the object delete was skipped
func (tc *Manager) IsSkippedDelete(id object.ObjMetadata) bool {
objStatus, found := tc.ObjectStatus(id)
if !found {
return false
}
return objStatus.Strategy == ActuationStrategyDelete &&
objStatus.Actuation == ActuationSkipped
}
// AddSkippedDelete registers that the object delete was skipped
func (tc *Manager) AddSkippedDelete(id object.ObjMetadata) {
tc.SetObjectStatus(id, ObjectStatus{
ObjectReference: ObjectReferenceFromObjMetadata(id),
Strategy: ActuationStrategyDelete,
Actuation: ActuationSkipped,
Reconcile: ReconcilePending,
})
}
// SkippedDeletes returns all the objects where deletion was skipped
func (tc *Manager) SkippedDeletes() object.ObjMetadataSet {
return tc.ObjectsWithActuationStatus(ActuationStrategyDelete, ActuationSkipped)
}
// IsSuccessfulReconcile returns true if the object is reconciled
func (tc *Manager) IsSuccessfulReconcile(id object.ObjMetadata) bool {
objStatus, found := tc.ObjectStatus(id)
if !found {
return false
}
return objStatus.Reconcile == ReconcileSucceeded
}
// SetSuccessfulReconcile registers that the object is reconciled
func (tc *Manager) SetSuccessfulReconcile(id object.ObjMetadata) error {
objStatus, found := tc.ObjectStatus(id)
if !found {
return fmt.Errorf("object not in inventory: %q", id)
}
objStatus.Reconcile = ReconcileSucceeded
return nil
}
// SuccessfulReconciles returns all the reconciled objects
func (tc *Manager) SuccessfulReconciles() object.ObjMetadataSet {
return tc.ObjectsWithReconcileStatus(ReconcileSucceeded)
}
// IsFailedReconcile returns true if the object failed to reconcile
func (tc *Manager) IsFailedReconcile(id object.ObjMetadata) bool {
objStatus, found := tc.ObjectStatus(id)
if !found {
return false
}
return objStatus.Reconcile == ReconcileFailed
}
// SetFailedReconcile registers that the object failed to reconcile
func (tc *Manager) SetFailedReconcile(id object.ObjMetadata) error {
objStatus, found := tc.ObjectStatus(id)
if !found {
return fmt.Errorf("object not in inventory: %q", id)
}
objStatus.Reconcile = ReconcileFailed
return nil
}
// FailedReconciles returns all the objects that failed to reconcile
func (tc *Manager) FailedReconciles() object.ObjMetadataSet {
return tc.ObjectsWithReconcileStatus(ReconcileFailed)
}
// IsSkippedReconcile returns true if the object reconcile was skipped
func (tc *Manager) IsSkippedReconcile(id object.ObjMetadata) bool {
objStatus, found := tc.ObjectStatus(id)
if !found {
return false
}
return objStatus.Reconcile == ReconcileSkipped
}
// SetSkippedReconcile registers that the object reconcile was skipped
func (tc *Manager) SetSkippedReconcile(id object.ObjMetadata) error {
objStatus, found := tc.ObjectStatus(id)
if !found {
return fmt.Errorf("object not in inventory: %q", id)
}
objStatus.Reconcile = ReconcileSkipped
return nil
}
// SkippedReconciles returns all the objects where reconcile was skipped
func (tc *Manager) SkippedReconciles() object.ObjMetadataSet {
return tc.ObjectsWithReconcileStatus(ReconcileSkipped)
}
// IsTimeoutReconcile returns true if the object reconcile was skipped
func (tc *Manager) IsTimeoutReconcile(id object.ObjMetadata) bool {
objStatus, found := tc.ObjectStatus(id)
if !found {
return false
}
return objStatus.Reconcile == ReconcileTimeout
}
// SetTimeoutReconcile registers that the object reconcile was skipped
func (tc *Manager) SetTimeoutReconcile(id object.ObjMetadata) error {
objStatus, found := tc.ObjectStatus(id)
if !found {
return fmt.Errorf("object not in inventory: %q", id)
}
objStatus.Reconcile = ReconcileTimeout
return nil
}
// TimeoutReconciles returns all the objects where reconcile was skipped
func (tc *Manager) TimeoutReconciles() object.ObjMetadataSet {
return tc.ObjectsWithReconcileStatus(ReconcileTimeout)
}
// IsPendingReconcile returns true if the object reconcile is pending
func (tc *Manager) IsPendingReconcile(id object.ObjMetadata) bool {
objStatus, found := tc.ObjectStatus(id)
if !found {
return false
}
return objStatus.Reconcile == ReconcilePending
}
// SetPendingReconcile registers that the object reconcile is pending
func (tc *Manager) SetPendingReconcile(id object.ObjMetadata) error {
objStatus, found := tc.ObjectStatus(id)
if !found {
return fmt.Errorf("object not in inventory: %q", id)
}
objStatus.Reconcile = ReconcilePending
return nil
}
// PendingReconciles returns all the objects where reconcile is pending
func (tc *Manager) PendingReconciles() object.ObjMetadataSet {
return tc.ObjectsWithReconcileStatus(ReconcilePending)
}

View File

@ -0,0 +1,27 @@
// Code generated by "stringer -type=ReconcileStatus -linecomment"; DO NOT EDIT.
package inventory
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[ReconcilePending-0]
_ = x[ReconcileSucceeded-1]
_ = x[ReconcileSkipped-2]
_ = x[ReconcileFailed-3]
_ = x[ReconcileTimeout-4]
}
const _ReconcileStatus_name = "PendingSucceededSkippedFailedTimeout"
var _ReconcileStatus_index = [...]uint8{0, 7, 16, 23, 29, 36}
func (i ReconcileStatus) String() string {
if i < 0 || i >= ReconcileStatus(len(_ReconcileStatus_index)-1) {
return "ReconcileStatus(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _ReconcileStatus_name[_ReconcileStatus_index[i]:_ReconcileStatus_index[i+1]]
}

View File

@ -0,0 +1,39 @@
// Copyright 2021 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package inventory
import (
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/object"
)
// ObjMetadataEqualObjectReference compares an ObjMetadata with a ObjectReference
func ObjMetadataEqualObjectReference(id object.ObjMetadata, ref ObjectReference) bool {
return id.GroupKind.Group == ref.Group &&
id.GroupKind.Kind == ref.Kind &&
id.Namespace == ref.Namespace &&
id.Name == ref.Name
}
// ObjectReferenceFromObjMetadata converts an ObjMetadata to a ObjectReference
func ObjectReferenceFromObjMetadata(id object.ObjMetadata) ObjectReference {
return ObjectReference{
Group: id.GroupKind.Group,
Kind: id.GroupKind.Kind,
Name: id.Name,
Namespace: id.Namespace,
}
}
// ObjMetadataFromObjectReference converts an ObjectReference to a ObjMetadata
func ObjMetadataFromObjectReference(ref ObjectReference) object.ObjMetadata {
return object.ObjMetadata{
GroupKind: schema.GroupKind{
Group: ref.Group,
Kind: ref.Kind,
},
Name: ref.Name,
Namespace: ref.Namespace,
}
}

134
pkg/inventory/types.go Normal file
View File

@ -0,0 +1,134 @@
// Copyright 2021 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package inventory
import "k8s.io/apimachinery/pkg/types"
// Inventory represents the inventory object in memory.
// Inventory is currently only used for in-memory storage and not serialized to
// disk or to the API server.
// TODO: Replace InventoryInfo with Inventory.TypeMeta & Inventory.ObjectMeta
// TODO: Replace object.ObjMetadataSet in Storage interface with Inventory.Spec
type Inventory struct {
TypeMeta `json:",inline"`
ObjectMeta `json:"metadata,omitempty"`
Spec InventorySpec `json:"spec,omitempty"`
Status InventoryStatus `json:"status,omitempty"`
}
// InventorySpec is the specification of the desired/expected inventory state.
type InventorySpec struct {
Objects []ObjectReference `json:"objects,omitempty"`
}
// InventoryStatus is the status of the current/last-known inventory state.
type InventoryStatus struct {
Objects []ObjectStatus `json:"objects,omitempty"`
}
// ObjectReference is a reference to a KRM resource by name and kind.
//
// Kubernetes only stores one API Version for each Kind at any given time,
// so version is not used when referencing objects.
type ObjectReference struct {
// Kind identifies a REST resource within a Group.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
Kind string `json:"kind,omitempty"`
// Group identifies an API namespace for REST resources.
// If group is omitted, it is treated as the "core" group.
// More info: https://kubernetes.io/docs/reference/using-api/#api-groups
// +optional
Group string `json:"group,omitempty"`
// Name identifies an object instance of a REST resource.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
Name string `json:"name,omitempty"`
// Namespace identifies a group of objects across REST resources.
// If namespace is specified, the resource must be namespace-scoped.
// If namespace is omitted, the resource must be cluster-scoped.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
// +optional
Namespace string `json:"namespace,omitempty"`
}
// ObjectStatus is a snapshot of the actuation and reconciliation status of a
// referenced object.
type ObjectStatus struct {
ObjectReference `json:",inline"`
// Strategy indicates the method of actuation (apply or delete) used or planned to be used.
Strategy ActuationStrategy `json:"strategy,omitempty"`
// Actuation indicates whether actuation has been performed yet and how it went.
Actuation ActuationStatus `json:"actuation,omitempty"`
// Reconcile indicates whether reconciliation has been performed yet and how it went.
Reconcile ReconcileStatus `json:"reconcile,omitempty"`
// UID is the last known UID (after apply or before delete).
// This can help identify if the object has been replaced.
// +optional
UID types.UID `json:"uid,omitempty"`
// Generation is the last known Generation (after apply or before delete).
// This can help identify if the object has been modified.
// Generation is not available for deleted objects.
// +optional
Generation int64 `json:"generation,omitempty"`
}
//go:generate stringer -type=ActuationStrategy -linecomment
type ActuationStrategy int
const (
ActuationStrategyApply ActuationStrategy = iota // Apply
ActuationStrategyDelete // Delete
)
//go:generate stringer -type=ActuationStatus -linecomment
type ActuationStatus int
const (
ActuationPending ActuationStatus = iota // Pending
ActuationSucceeded // Succeeded
ActuationSkipped // Skipped
ActuationFailed // Failed
)
//go:generate stringer -type=ReconcileStatus -linecomment
type ReconcileStatus int
const (
ReconcilePending ReconcileStatus = iota // Pending
ReconcileSucceeded // Succeeded
ReconcileSkipped // Skipped
ReconcileFailed // Failed
ReconcileTimeout // Timeout
)
// TypeMeta describes a REST resource.
type TypeMeta struct {
// Kind is a string value representing the REST resource this object represents.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
Kind string `json:"kind,omitempty"`
// APIVersion defines the versioned schema of this representation of an object.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
APIVersion string `json:"apiVersion,omitempty"`
}
// ObjectMeta describes an individual object instance of a REST resource.
// TODO: Do we need other fields, like UID, Generation, ResourceVersion, and CreationTimestamp?
type ObjectMeta struct {
// Name identifies an object instance of a REST resource.
// More info: http://kubernetes.io/docs/user-guide/identifiers#names
Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`
// Namespace identifies a group of objects across REST resources.
// If namespace is specified, the resource must be namespace-scoped.
// If namespace is omitted, the resource must be cluster-scoped.
// More info: http://kubernetes.io/docs/user-guide/namespaces
// +optional
Namespace string `json:"namespace,omitempty" protobuf:"bytes,3,opt,name=namespace"`
}

View File

@ -92,7 +92,7 @@ func invToUnstructuredFunc(inv inventory.InventoryInfo) *unstructured.Unstructur
}
}
func WrapInventoryObj(obj *unstructured.Unstructured) inventory.Inventory {
func WrapInventoryObj(obj *unstructured.Unstructured) inventory.Storage {
return &InventoryCustomType{inv: obj}
}
@ -100,7 +100,7 @@ func WrapInventoryInfoObj(obj *unstructured.Unstructured) inventory.InventoryInf
return &InventoryCustomType{inv: obj}
}
var _ inventory.Inventory = &InventoryCustomType{}
var _ inventory.Storage = &InventoryCustomType{}
var _ inventory.InventoryInfo = &InventoryCustomType{}
type InventoryCustomType struct {