New observer library for status

This commit is contained in:
Morten Torkildsen 2020-02-09 13:01:23 -08:00
parent 547a6eab97
commit a608ef76ac
11 changed files with 1005 additions and 0 deletions

2
go.mod
View File

@ -8,6 +8,8 @@ require (
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.4.0
gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71
gotest.tools v2.2.0+incompatible
k8s.io/api v0.17.2
k8s.io/apimachinery v0.17.2
k8s.io/cli-runtime v0.17.2

2
go.sum
View File

@ -430,6 +430,8 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71 h1:Xe2gvTZUJpsvOWUnvmL/tmhVBZUmHSvLbMjRj6NUUKo=
gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -0,0 +1,125 @@
package event
import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/wait"
)
// EventType is the type that describes the type of an Event that is passed back to the caller
// as resources in the cluster are being observed.
type EventType string
const (
// ResourceUpdateEvent describes events related to a change in the status of one of the observed resources.
ResourceUpdateEvent EventType = "ResourceUpdated"
// CompletedEvent signals that all resources have been reconciled and the observer has completed its work. The
// event channel will be closed after this event.
CompletedEvent EventType = "Completed"
// AbortedEvent signals that the observer is shutting down because it has been cancelled. All resources might
// not have been reconciled. The event channel will be closed after this event.
AbortedEvent EventType = "Aborted"
// ErrorEvent signals that the observer has encountered an error that it can not recover from. The observer
// is shutting down and the event channel will be closed after this event.
ErrorEvent EventType = "Error"
)
// Event defines that type that is passed back through the event channel to notify the caller of changes
// as resources are being observed.
type Event struct {
// EventType defines the type of event.
EventType EventType
// AggregateStatus is the collective status for all the resources. It is computed by the
// StatusAggregator
AggregateStatus status.Status
// Resource is only available for ResourceUpdateEvents. It includes information about the observed resource,
// including the resource status, any errors and the resource itself (as an unstructured).
Resource *ObservedResource
// Error is only available for ErrorEvents. It contains the error that caused the observer to
// give up.
Error error
}
// ObservedResource contains information about a resource after we have
// fetched it from the cluster and computed status.
type ObservedResource struct {
// Identifier contains the information necessary to locate the
// resource within a cluster.
Identifier wait.ResourceIdentifier
// Status is the computed status for this resource.
Status status.Status
// Resource contains the actual manifest for the resource that
// was fetched from the cluster and used to compute status.
Resource *unstructured.Unstructured
// Errors contains the error if something went wrong during the
// process of fetching the resource and computing the status.
Error error
// Message is text describing the status of the resource.
Message string
// GeneratedResources is a slice of ObservedResource that
// contains information and status for any generated resources
// of the current resource.
GeneratedResources ObservedResources
}
type ObservedResources []*ObservedResource
func (g ObservedResources) Len() int {
return len(g)
}
func (g ObservedResources) Less(i, j int) bool {
idI := g[i].Identifier
idJ := g[j].Identifier
if idI.Namespace != idJ.Namespace {
return idI.Namespace < idJ.Namespace
}
if idI.GroupKind.Group != idJ.GroupKind.Group {
return idI.GroupKind.Group < idJ.GroupKind.Group
}
if idI.GroupKind.Kind != idJ.GroupKind.Kind {
return idI.GroupKind.Kind < idJ.GroupKind.Kind
}
return idI.Name < idJ.Name
}
func (g ObservedResources) Swap(i, j int) {
g[i], g[j] = g[j], g[i]
}
// DeepEqual checks if two instances of ObservedResource are identical. This is used
// to determine whether status has changed for a particular resource.
func DeepEqual(or1, or2 *ObservedResource) bool {
if or1.Identifier != or2.Identifier ||
or1.Status != or2.Status ||
or1.Message != or2.Message {
return false
}
if or1.Error != nil && or2.Error != nil && or1.Error.Error() != or2.Error.Error() {
return false
}
if (or1.Error == nil && or2.Error != nil) || (or1.Error != nil && or2.Error == nil) {
return false
}
if len(or1.GeneratedResources) != len(or2.GeneratedResources) {
return false
}
for i := range or1.GeneratedResources {
if !DeepEqual(or1.GeneratedResources[i], or2.GeneratedResources[i]) {
return false
}
}
return true
}

View File

@ -0,0 +1,273 @@
package event
import (
"fmt"
"testing"
"gotest.tools/assert"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/wait"
)
func TestDeepEqual(t *testing.T) {
testCases := map[string]struct {
actual ObservedResource
expected ObservedResource
equal bool
}{
"same resource should be equal": {
actual: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Foo",
},
Status: status.UnknownStatus,
Message: "Some message",
},
expected: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Foo",
},
Status: status.UnknownStatus,
Message: "Some message",
},
equal: true,
},
"different resources with only name different": {
actual: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Foo",
},
Status: status.CurrentStatus,
},
expected: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.CurrentStatus,
},
equal: false,
},
"different GroupKind otherwise same": {
actual: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.CurrentStatus,
},
expected: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "custom.io",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.CurrentStatus,
},
equal: false,
},
"same resource with same error": {
actual: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.UnknownStatus,
Error: fmt.Errorf("this is a test"),
},
expected: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.UnknownStatus,
Error: fmt.Errorf("this is a test"),
},
equal: true,
},
"same resource with different error": {
actual: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.UnknownStatus,
Error: fmt.Errorf("this is a test"),
},
expected: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.UnknownStatus,
Error: fmt.Errorf("this is a different error"),
},
equal: false,
},
"same resource different status": {
actual: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.CurrentStatus,
},
expected: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.InProgressStatus,
},
equal: false,
},
"same resource with different number of generated resources": {
actual: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.InProgressStatus,
GeneratedResources: []*ObservedResource{
{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "ReplicaSet",
},
Namespace: "default",
Name: "Bar-123",
},
Status: status.InProgressStatus,
},
},
},
expected: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.InProgressStatus,
},
equal: false,
},
"same resource with different status on generated resources": {
actual: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.InProgressStatus,
GeneratedResources: []*ObservedResource{
{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "ReplicaSet",
},
Namespace: "default",
Name: "Bar-123",
},
Status: status.InProgressStatus,
},
},
},
expected: ObservedResource{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "default",
Name: "Bar",
},
Status: status.InProgressStatus,
GeneratedResources: []*ObservedResource{
{
Identifier: wait.ResourceIdentifier{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "ReplicaSet",
},
Namespace: "default",
Name: "Bar-123",
},
Status: status.CurrentStatus,
},
},
},
equal: false,
},
}
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
res := DeepEqual(&tc.actual, &tc.expected)
assert.Equal(t, tc.equal, res)
})
}
}

View File

@ -0,0 +1,20 @@
package observer
import (
"sigs.k8s.io/cli-utils/pkg/kstatus/observe/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
)
// StatusAggregator provides the interface the observer uses to compute the aggregate status.
// It also include a function that will be used by the observer to determine if all resources
// should be considered fully reconciled.
type StatusAggregator interface {
// ResourceObserved notifies the aggregator of a new observation. Called after status has been
// computed.
ResourceObserved(resource *event.ObservedResource)
// AggregateStatus computes the aggregate status for all the resources at the given
// point in time.
AggregateStatus() status.Status
// Completed returns true if all resources should be considered reconciled and false otherwise.
Completed() bool
}

View File

@ -0,0 +1,235 @@
package observer
import (
"context"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/observe/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// AggregatorFactoryFunc defines the signature for the function the Observer will use to
// create a new StatusAggregator for each statusObserverRunner.
type AggregatorFactoryFunc func(identifiers []wait.ResourceIdentifier) StatusAggregator
// ReaderFactoryFunc defines the signature for the function the Observer will use to create
// a new ObserverReader for each statusObserverRunner.
type ReaderFactoryFunc func(reader client.Reader, mapper meta.RESTMapper,
identifiers []wait.ResourceIdentifier) (ObserverReader, error)
// ObserversFactoryFunc defines the signature for the function the Observer will use to
// create the resource observers and the default observer for each statusObserverRunner.
type ObserversFactoryFunc func(reader ObserverReader, mapper meta.RESTMapper) (
resourceObservers map[schema.GroupKind]ResourceObserver, defaultObserver ResourceObserver)
// Observer provides functionality for polling a cluster for status for a set of resources.
type Observer struct {
Reader client.Reader
Mapper meta.RESTMapper
// AggregatorFunc provides the Observer with a way to create a new aggregator. This will
// happen for every call to Observe since each statusObserverRunner keeps its own
// status aggregator.
AggregatorFactoryFunc AggregatorFactoryFunc
// ReaderFactoryFunc provides the Observer with a factory function for creating new
// ObserverReaders. Since these can be stateful, every call to Observe will create a new
// ObserverReader.
ReaderFactoryFunc ReaderFactoryFunc
// ObserversFactoryFunc provides the Observer with a factory function for creating resource
// observers. Each statusObserverRunner has a separate set of observers, so this will be called
// for every call to Observe.
ObserversFactoryFunc ObserversFactoryFunc
}
// Observe will create a new statusObserverRunner that will poll all the resources provided and report their status
// back on the event channel returned. The statusObserverRunner can be cancelled at any time by cancelling the
// context passed in.
// If observeUntilCancelled is set to false, then the runner will stop observing the resources when the StatusAggregator
// determines that all resources has been fully reconciled. If this is set to true, the observer will keep running
// until cancelled. This can be useful if the goal is to just monitor a set of resources rather than waiting for
// all to reach a specific status.
// The pollInterval specifies how often the observer should poll the cluster for the latest state of the resources.
func (s *Observer) Observe(ctx context.Context, identifiers []wait.ResourceIdentifier, pollInterval time.Duration,
observeUntilCancelled bool) <-chan event.Event {
eventChannel := make(chan event.Event)
go func() {
defer close(eventChannel)
observerReader, err := s.ReaderFactoryFunc(s.Reader, s.Mapper, identifiers)
if err != nil {
eventChannel <- event.Event{
EventType: event.ErrorEvent,
Error: err,
}
return
}
observers, defaultObserver := s.ObserversFactoryFunc(observerReader, s.Mapper)
aggregator := s.AggregatorFactoryFunc(identifiers)
runner := &statusObserverRunner{
ctx: ctx,
reader: observerReader,
observers: observers,
defaultObserver: defaultObserver,
identifiers: identifiers,
previousObservedResources: make(map[wait.ResourceIdentifier]*event.ObservedResource),
eventChannel: eventChannel,
statusAggregator: aggregator,
observeUntilCancelled: observeUntilCancelled,
pollingInterval: pollInterval,
}
runner.Run()
}()
return eventChannel
}
// statusObserverRunner is responsible for polling of a set of resources. Each call to Observe will create
// a new statusObserverRunner, which means we can keep state in the runner and all data will only be accessed
// by a single goroutine, meaning we don't need synchronization.
// The statusObserverRunner uses an implementation of the ObserverReader interface to talk to the
// kubernetes cluster. Currently this can be either the cached ObserverReader that syncs all needed resources
// with LIST calls before each polling loop, or the normal ObserverReader that just forwards each call
// to the client.Reader from controller-runtime.
type statusObserverRunner struct {
// ctx is the context for the runner. It will be used by the caller of Observe to cancel
// observing resources.
ctx context.Context
// reader is the interface for fetching and listing resources from the cluster. It can be implemented
// to make call directly to the cluster or use caching to reduce the number of calls to the cluster.
reader ObserverReader
// observers contains the resource specific observers. These will contain logic for how to
// compute status for specific GroupKinds. These will use an ObserverReader to fetch
// status of a resource and any generated resources.
observers map[schema.GroupKind]ResourceObserver
// defaultObserver is the generic observer that is used for all GroupKinds that
// doesn't have a specific observer in the observers map.
defaultObserver ResourceObserver
// identifiers contains the list of identifiers for the resources that should be observed.
// Each resource is identified by GroupKind, namespace and name.
identifiers []wait.ResourceIdentifier
// previousObservedResources keeps track of the last event for each
// of the observed resources. This is used to make sure we only
// send events on the event channel when something has actually changed.
previousObservedResources map[wait.ResourceIdentifier]*event.ObservedResource
// eventChannel is a channel where any updates to the observed status of resources
// will be sent. The caller of Observe will listen for updates.
eventChannel chan event.Event
// statusAggregator is responsible for keeping track of the status of
// all of the observed resources and to compute the aggregate status.
statusAggregator StatusAggregator
// observeUntilCancelled decides whether the runner should keep running
// even if the statusAggregator decides that all resources has reached the
// desired status.
observeUntilCancelled bool
// pollingInterval determines how often we should poll the cluster for
// the latest state of resources.
pollingInterval time.Duration
}
// Run starts the polling loop of the observers.
func (r *statusObserverRunner) Run() {
// Sets up ticker that will trigger the regular polling loop at a regular interval.
ticker := time.NewTicker(r.pollingInterval)
defer func() {
ticker.Stop()
}()
for {
select {
case <-r.ctx.Done():
// If the context has been cancelled, just send an AbortedEvent
// and pass along the most up-to-date aggregate status. Then return
// from this function, which will stop the ticker and close the event channel.
aggregatedStatus := r.statusAggregator.AggregateStatus()
r.eventChannel <- event.Event{
EventType: event.AbortedEvent,
AggregateStatus: aggregatedStatus,
}
return
case <-ticker.C:
// First trigger a sync of the ObserverReader. This may or may not actually
// result in calls to the cluster, depending on the implementation.
// If this call fails, there is no clean way to recover, so we just return an ErrorEvent
// and shut down.
err := r.reader.Sync(r.ctx)
if err != nil {
r.eventChannel <- event.Event{
EventType: event.ErrorEvent,
Error: err,
}
return
}
// Poll all resources and compute status. If the polling of resources has completed (based
// on information from the StatusAggregator and the value of observeUntilCancelled), we send
// a CompletedEvent and return.
completed := r.observeStatusForAllResources()
if completed {
aggregatedStatus := r.statusAggregator.AggregateStatus()
r.eventChannel <- event.Event{
EventType: event.CompletedEvent,
AggregateStatus: aggregatedStatus,
}
return
}
}
}
}
// observeStatusForAllResources iterates over all the resources in the set and delegates
// to the appropriate observer to compute the status.
func (r *statusObserverRunner) observeStatusForAllResources() bool {
for _, id := range r.identifiers {
gk := id.GroupKind
observer := r.observerForGroupKind(gk)
observedResource := observer.Observe(r.ctx, id)
r.statusAggregator.ResourceObserved(observedResource)
if r.isUpdatedObservedResource(observedResource) {
r.previousObservedResources[id] = observedResource
aggregatedStatus := r.statusAggregator.AggregateStatus()
r.eventChannel <- event.Event{
EventType: event.ResourceUpdateEvent,
AggregateStatus: aggregatedStatus,
Resource: observedResource,
}
if r.statusAggregator.Completed() && !r.observeUntilCancelled {
return true
}
}
}
if r.statusAggregator.Completed() && !r.observeUntilCancelled {
return true
}
return false
}
func (r *statusObserverRunner) observerForGroupKind(gk schema.GroupKind) ResourceObserver {
observer, ok := r.observers[gk]
if !ok {
return r.defaultObserver
}
return observer
}
func (r *statusObserverRunner) isUpdatedObservedResource(observedResource *event.ObservedResource) bool {
oldObservedResource, found := r.previousObservedResources[observedResource.Identifier]
if !found {
return true
}
return !event.DeepEqual(observedResource, oldObservedResource)
}

View File

@ -0,0 +1,227 @@
package observer
import (
"context"
"testing"
"time"
"gotest.tools/assert"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/observe/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/observe/testutil"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func TestStatusObserverRunner(t *testing.T) {
testCases := map[string]struct {
identifiers []wait.ResourceIdentifier
defaultObserver ResourceObserver
expectedEventTypes []event.EventType
}{
"no resources": {
identifiers: []wait.ResourceIdentifier{},
expectedEventTypes: []event.EventType{event.CompletedEvent},
},
"single resource": {
identifiers: []wait.ResourceIdentifier{
{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Name: "foo",
Namespace: "bar",
},
},
defaultObserver: &fakeObserver{
resourceObservations: map[schema.GroupKind][]status.Status{
schema.GroupKind{Group: "apps", Kind: "Deployment"}: { //nolint:gofmt
status.InProgressStatus,
status.CurrentStatus,
},
},
resourceObservationCount: make(map[schema.GroupKind]int),
},
expectedEventTypes: []event.EventType{
event.ResourceUpdateEvent,
event.ResourceUpdateEvent,
event.CompletedEvent,
},
},
"multiple resources": {
identifiers: []wait.ResourceIdentifier{
{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Name: "foo",
Namespace: "default",
},
{
GroupKind: schema.GroupKind{
Group: "",
Kind: "Service",
},
Name: "bar",
Namespace: "default",
},
},
defaultObserver: &fakeObserver{
resourceObservations: map[schema.GroupKind][]status.Status{
schema.GroupKind{Group: "apps", Kind: "Deployment"}: { //nolint:gofmt
status.InProgressStatus,
status.CurrentStatus,
},
schema.GroupKind{Group: "", Kind: "Service"}: { //nolint:gofmt
status.InProgressStatus,
status.InProgressStatus,
status.CurrentStatus,
},
},
resourceObservationCount: make(map[schema.GroupKind]int),
},
expectedEventTypes: []event.EventType{
event.ResourceUpdateEvent,
event.ResourceUpdateEvent,
event.ResourceUpdateEvent,
event.ResourceUpdateEvent,
event.CompletedEvent,
},
},
}
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
ctx := context.Background()
identifiers := tc.identifiers
observer := Observer{
AggregatorFactoryFunc: func(identifiers []wait.ResourceIdentifier) StatusAggregator {
return newFakeAggregator(identifiers)
},
ReaderFactoryFunc: func(_ client.Reader, _ meta.RESTMapper, _ []wait.ResourceIdentifier) (ObserverReader, error) {
return testutil.NewNoopObserverReader(), nil
},
ObserversFactoryFunc: func(_ ObserverReader, _ meta.RESTMapper) (resourceObservers map[schema.GroupKind]ResourceObserver, defaultObserver ResourceObserver) {
return make(map[schema.GroupKind]ResourceObserver), tc.defaultObserver
},
}
eventChannel := observer.Observe(ctx, identifiers, 2*time.Second, false)
var eventTypes []event.EventType
for ch := range eventChannel {
eventTypes = append(eventTypes, ch.EventType)
}
assert.DeepEqual(t, tc.expectedEventTypes, eventTypes)
})
}
}
func TestNewStatusObserverRunnerCancellation(t *testing.T) {
identifiers := make([]wait.ResourceIdentifier, 0)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
timer := time.NewTimer(5 * time.Second)
observer := Observer{
AggregatorFactoryFunc: func(identifiers []wait.ResourceIdentifier) StatusAggregator {
return newFakeAggregator(identifiers)
//return aggregator.NewAllCurrentOrNotFoundStatusAggregator(identifiers)
},
ReaderFactoryFunc: func(_ client.Reader, _ meta.RESTMapper, _ []wait.ResourceIdentifier) (ObserverReader, error) {
return testutil.NewNoopObserverReader(), nil
},
ObserversFactoryFunc: func(_ ObserverReader, _ meta.RESTMapper) (resourceObservers map[schema.GroupKind]ResourceObserver, defaultObserver ResourceObserver) {
return make(map[schema.GroupKind]ResourceObserver), nil
},
}
eventChannel := observer.Observe(ctx, identifiers, 2*time.Second, true)
var lastEvent event.Event
for {
select {
case e, more := <-eventChannel:
timer.Stop()
if more {
lastEvent = e
} else {
if want, got := event.AbortedEvent, lastEvent.EventType; got != want {
t.Errorf("Expected e to have type %s, but got %s", want, got)
}
return
}
case <-timer.C:
t.Errorf("expected runner to time out, but it didn't")
return
}
}
}
type fakeObserver struct {
resourceObservations map[schema.GroupKind][]status.Status
resourceObservationCount map[schema.GroupKind]int
}
func (f *fakeObserver) Observe(_ context.Context, identifier wait.ResourceIdentifier) *event.ObservedResource {
count := f.resourceObservationCount[identifier.GroupKind]
observedResourceStatusSlice := f.resourceObservations[identifier.GroupKind]
var observedResourceStatus status.Status
if len(observedResourceStatusSlice) > count {
observedResourceStatus = observedResourceStatusSlice[count]
} else {
observedResourceStatus = observedResourceStatusSlice[len(observedResourceStatusSlice)-1]
}
f.resourceObservationCount[identifier.GroupKind] = count + 1
return &event.ObservedResource{
Identifier: identifier,
Status: observedResourceStatus,
}
}
func (f *fakeObserver) ObserveObject(_ context.Context, _ *unstructured.Unstructured) *event.ObservedResource {
return nil
}
func (f *fakeObserver) SetComputeStatusFunc(_ ComputeStatusFunc) {}
func newFakeAggregator(identifiers []wait.ResourceIdentifier) *fakeAggregator {
statuses := make(map[wait.ResourceIdentifier]status.Status)
for _, id := range identifiers {
statuses[id] = status.UnknownStatus
}
return &fakeAggregator{
statuses: statuses,
}
}
type fakeAggregator struct {
statuses map[wait.ResourceIdentifier]status.Status
}
func (f *fakeAggregator) ResourceObserved(resource *event.ObservedResource) {
f.statuses[resource.Identifier] = resource.Status
}
func (f *fakeAggregator) AggregateStatus() status.Status {
for _, s := range f.statuses {
if s != status.CurrentStatus {
return status.InProgressStatus
}
}
return status.CurrentStatus
}
func (f *fakeAggregator) Completed() bool {
return f.AggregateStatus() == status.CurrentStatus
}

View File

@ -0,0 +1,25 @@
package observer
import (
"context"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// ObserverReader is the interface provided to the observers to talk to the cluster.
type ObserverReader interface {
// Get looks up the resource identifier by the key and the GVK in the provided obj reference. If something
// goes wrong or the resource doesn't exist, an error is returned.
Get(ctx context.Context, key client.ObjectKey, obj *unstructured.Unstructured) error
// ListNamespaceScoped looks up the resources of the GVK given in the list and matches the namespace and
// selector provided.
ListNamespaceScoped(ctx context.Context, list *unstructured.UnstructuredList, namespace string, selector labels.Selector) error
// ListClusterScoped looks up the resources of the GVK given in the list and that matches the selector
// provided.
ListClusterScoped(ctx context.Context, list *unstructured.UnstructuredList, selector labels.Selector) error
// Sync is called by the observer before every polling loop, which provides an opportunity for the Reader
// to sync caches.
Sync(ctx context.Context) error
}

View File

@ -0,0 +1,38 @@
package observer
import (
"context"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/cli-utils/pkg/kstatus/observe/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/wait"
)
// ComputeStatusFunc is the function that the observer will delegate to for
// computing status of the resource. This is set to use the status library
// by default and changing it is probably only useful for testing.
type ComputeStatusFunc func(u *unstructured.Unstructured) (*status.Result, error)
// ResourceObserver is the main interface for observers. In this context,
// an observer is an object that can fetch a resource of a specific
// GroupKind from the cluster and compute its status. For resources that
// can own generated resources, the observer might also have knowledge about
// how to identify these generated resources and how to compute status for
// these generated resources.
type ResourceObserver interface {
// Observe will fetch the resource identified by the given identifier
// from the cluster and return an ObservedResource that will contain
// information about the latest state of the resource, its computed status
// and information about any generated resources.
Observe(ctx context.Context, resource wait.ResourceIdentifier) *event.ObservedResource
// ObserveObject is similar to Observe, but instead of looking up the
// resource based on an identifier, it will use the passed in resource.
ObserveObject(ctx context.Context, object *unstructured.Unstructured) *event.ObservedResource
// SetComputeStatusFunc can be used to set the function invoked by
// the observer to compute the status of a resource. By default this
// is set to use the status library. This is only used for testing.
SetComputeStatusFunc(statusFunc ComputeStatusFunc)
}

View File

@ -0,0 +1,57 @@
package testutil
import (
"context"
"testing"
"gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func YamlToUnstructured(t *testing.T, yml string) *unstructured.Unstructured {
m := make(map[string]interface{})
err := yaml.Unmarshal([]byte(yml), &m)
if err != nil {
t.Fatalf("error parsing yaml: %v", err)
return nil
}
return &unstructured.Unstructured{Object: m}
}
func NewFakeRESTMapper(gvks ...schema.GroupVersionKind) meta.RESTMapper {
var groupVersions []schema.GroupVersion
for _, gvk := range gvks {
groupVersions = append(groupVersions, gvk.GroupVersion())
}
mapper := meta.NewDefaultRESTMapper(groupVersions)
for _, gvk := range gvks {
mapper.Add(gvk, meta.RESTScopeNamespace)
}
return mapper
}
func NewNoopObserverReader() *NoopObserverReader {
return &NoopObserverReader{}
}
type NoopObserverReader struct{}
func (n *NoopObserverReader) Get(_ context.Context, _ client.ObjectKey, _ *unstructured.Unstructured) error {
return nil
}
func (n *NoopObserverReader) ListNamespaceScoped(_ context.Context, _ *unstructured.UnstructuredList, _ string, _ labels.Selector) error {
return nil
}
func (n *NoopObserverReader) ListClusterScoped(_ context.Context, _ *unstructured.UnstructuredList, _ labels.Selector) error {
return nil
}
func (n *NoopObserverReader) Sync(_ context.Context) error {
return nil
}

View File

@ -24,6 +24,7 @@ const (
FailedStatus Status = "Failed"
CurrentStatus Status = "Current"
TerminatingStatus Status = "Terminating"
NotFoundStatus Status = "NotFound"
UnknownStatus Status = "Unknown"
)