Merge pull request #35 from mortent/AggregatorForObserve

Aggregator implementation for the Observe framework
This commit is contained in:
Jeff Regan 2020-02-13 14:41:36 -08:00 committed by GitHub
commit a097d8c557
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 211 additions and 0 deletions

View File

@ -0,0 +1,74 @@
package aggregator
import (
"sigs.k8s.io/cli-utils/pkg/kstatus/observe/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/wait"
)
// BasicAggregator implements StatusAggregator.
// Aggregate status will be Current when all observed
// resources are either Current or NotFound.
// TODO: Treating resources that doesn't exist as Current is
// weird. But it kinda does make sense when we track
// resources that are deleted/pruned. We should see if
// there is a better way to handle this.
type BasicAggregator struct {
resourceCurrentStatus map[wait.ResourceIdentifier]status.Status
}
// NewBasicAggregator returns a BasicAggregator that will track
// resources identified by the argument.
func NewAllCurrentOrNotFoundStatusAggregator(identifiers []wait.ResourceIdentifier) *BasicAggregator {
aggregator := &BasicAggregator{
resourceCurrentStatus: make(map[wait.ResourceIdentifier]status.Status),
}
for _, id := range identifiers {
aggregator.resourceCurrentStatus[id] = status.UnknownStatus
}
return aggregator
}
// ResourceObserved is called whenever we have an observation of a resource. In this
// case, we just keep the latest status so we can later compute the aggregate status
// for all the resources.
func (d *BasicAggregator) ResourceObserved(r *event.ObservedResource) {
d.resourceCurrentStatus[r.Identifier] = r.Status
}
// AggregateStatus computes the aggregate status for all the resources. In this
// implementation of the Aggregator, we treat resources with the NotFound status as Current.
func (d *BasicAggregator) AggregateStatus() status.Status {
// if we are not observing any resources, we consider status be Current.
if len(d.resourceCurrentStatus) == 0 {
return status.CurrentStatus
}
allCurrentOrNotFound := true
anyUnknown := false
for _, s := range d.resourceCurrentStatus {
if s == status.FailedStatus {
return status.FailedStatus
}
if s == status.UnknownStatus {
anyUnknown = true
}
if !(s == status.CurrentStatus || s == status.NotFoundStatus) {
allCurrentOrNotFound = false
}
}
if anyUnknown {
return status.UnknownStatus
}
if allCurrentOrNotFound {
return status.CurrentStatus
}
return status.InProgressStatus
}
// Completed is used by the framework to decide if the set of resources has
// all reached the desired status, i.e. the aggregate status. This is used to determine
// when to stop polling resources.
func (d *BasicAggregator) Completed() bool {
return d.AggregateStatus() == status.CurrentStatus
}

View File

@ -0,0 +1,137 @@
package aggregator
import (
"testing"
"gotest.tools/assert"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/observe/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/wait"
)
var resourceIdentifiers = map[string]wait.ResourceIdentifier{
"deployment": {
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Name: "Foo",
Namespace: "default",
},
"statefulset": {
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "StatefulSet",
},
Name: "Bar",
Namespace: "default",
},
"service": {
GroupKind: schema.GroupKind{
Group: "",
Kind: "Service",
},
Name: "Service",
Namespace: "default",
},
}
func TestAggregator(t *testing.T) {
testCases := map[string]struct {
identifiers []wait.ResourceIdentifier
observations []event.ObservedResource
aggregateStatus status.Status
}{
"no identifiers": {
identifiers: []wait.ResourceIdentifier{},
observations: []event.ObservedResource{},
aggregateStatus: status.CurrentStatus,
},
"single identifier with multiple observations": {
identifiers: []wait.ResourceIdentifier{resourceIdentifiers["deployment"]},
observations: []event.ObservedResource{
{
Identifier: resourceIdentifiers["deployment"],
Status: status.UnknownStatus,
},
{
Identifier: resourceIdentifiers["deployment"],
Status: status.InProgressStatus,
},
},
aggregateStatus: status.InProgressStatus,
},
"multiple resources with one unknown status": {
identifiers: []wait.ResourceIdentifier{
resourceIdentifiers["deployment"],
resourceIdentifiers["statefulset"],
},
observations: []event.ObservedResource{
{
Identifier: resourceIdentifiers["deployment"],
Status: status.UnknownStatus,
},
{
Identifier: resourceIdentifiers["statefulset"],
Status: status.InProgressStatus,
},
},
aggregateStatus: status.UnknownStatus,
},
"multiple resources with all current or not found": {
identifiers: []wait.ResourceIdentifier{
resourceIdentifiers["deployment"],
resourceIdentifiers["statefulset"],
},
observations: []event.ObservedResource{
{
Identifier: resourceIdentifiers["deployment"],
Status: status.NotFoundStatus,
},
{
Identifier: resourceIdentifiers["statefulset"],
Status: status.CurrentStatus,
},
},
aggregateStatus: status.CurrentStatus,
},
"multiple resources with one failed": {
identifiers: []wait.ResourceIdentifier{
resourceIdentifiers["deployment"],
resourceIdentifiers["statefulset"],
resourceIdentifiers["service"],
},
observations: []event.ObservedResource{
{
Identifier: resourceIdentifiers["deployment"],
Status: status.NotFoundStatus,
},
{
Identifier: resourceIdentifiers["statefulset"],
Status: status.CurrentStatus,
},
{
Identifier: resourceIdentifiers["service"],
Status: status.FailedStatus,
},
},
aggregateStatus: status.FailedStatus,
},
}
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
aggregator := NewAllCurrentOrNotFoundStatusAggregator(tc.identifiers)
for _, o := range tc.observations {
observation := o
aggregator.ResourceObserved(&observation)
}
aggStatus := aggregator.AggregateStatus()
assert.Equal(t, tc.aggregateStatus, aggStatus)
})
}
}