diff --git a/go.mod b/go.mod index 4b8dd2a..2b4eff8 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,8 @@ require ( github.com/pkg/errors v0.9.1 github.com/spf13/cobra v0.0.5 github.com/stretchr/testify v1.4.0 + gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71 + gotest.tools v2.2.0+incompatible k8s.io/api v0.17.2 k8s.io/apimachinery v0.17.2 k8s.io/cli-runtime v0.17.2 diff --git a/go.sum b/go.sum index 2248eba..40bf0ba 100644 --- a/go.sum +++ b/go.sum @@ -430,6 +430,8 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71 h1:Xe2gvTZUJpsvOWUnvmL/tmhVBZUmHSvLbMjRj6NUUKo= +gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/kstatus/observe/event/event.go b/pkg/kstatus/observe/event/event.go new file mode 100644 index 0000000..663c76e --- /dev/null +++ b/pkg/kstatus/observe/event/event.go @@ -0,0 +1,125 @@ +package event + +import ( + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/kstatus/wait" +) + +// EventType is the type that describes the type of an Event that is passed back to the caller +// as resources in the cluster are being observed. +type EventType string + +const ( + // ResourceUpdateEvent describes events related to a change in the status of one of the observed resources. + ResourceUpdateEvent EventType = "ResourceUpdated" + // CompletedEvent signals that all resources have been reconciled and the observer has completed its work. The + // event channel will be closed after this event. + CompletedEvent EventType = "Completed" + // AbortedEvent signals that the observer is shutting down because it has been cancelled. All resources might + // not have been reconciled. The event channel will be closed after this event. + AbortedEvent EventType = "Aborted" + // ErrorEvent signals that the observer has encountered an error that it can not recover from. The observer + // is shutting down and the event channel will be closed after this event. + ErrorEvent EventType = "Error" +) + +// Event defines that type that is passed back through the event channel to notify the caller of changes +// as resources are being observed. +type Event struct { + // EventType defines the type of event. + EventType EventType + + // AggregateStatus is the collective status for all the resources. It is computed by the + // StatusAggregator + AggregateStatus status.Status + + // Resource is only available for ResourceUpdateEvents. It includes information about the observed resource, + // including the resource status, any errors and the resource itself (as an unstructured). + Resource *ObservedResource + + // Error is only available for ErrorEvents. It contains the error that caused the observer to + // give up. + Error error +} + +// ObservedResource contains information about a resource after we have +// fetched it from the cluster and computed status. +type ObservedResource struct { + // Identifier contains the information necessary to locate the + // resource within a cluster. + Identifier wait.ResourceIdentifier + + // Status is the computed status for this resource. + Status status.Status + + // Resource contains the actual manifest for the resource that + // was fetched from the cluster and used to compute status. + Resource *unstructured.Unstructured + + // Errors contains the error if something went wrong during the + // process of fetching the resource and computing the status. + Error error + + // Message is text describing the status of the resource. + Message string + + // GeneratedResources is a slice of ObservedResource that + // contains information and status for any generated resources + // of the current resource. + GeneratedResources ObservedResources +} + +type ObservedResources []*ObservedResource + +func (g ObservedResources) Len() int { + return len(g) +} + +func (g ObservedResources) Less(i, j int) bool { + idI := g[i].Identifier + idJ := g[j].Identifier + + if idI.Namespace != idJ.Namespace { + return idI.Namespace < idJ.Namespace + } + if idI.GroupKind.Group != idJ.GroupKind.Group { + return idI.GroupKind.Group < idJ.GroupKind.Group + } + if idI.GroupKind.Kind != idJ.GroupKind.Kind { + return idI.GroupKind.Kind < idJ.GroupKind.Kind + } + return idI.Name < idJ.Name +} + +func (g ObservedResources) Swap(i, j int) { + g[i], g[j] = g[j], g[i] +} + +// DeepEqual checks if two instances of ObservedResource are identical. This is used +// to determine whether status has changed for a particular resource. +func DeepEqual(or1, or2 *ObservedResource) bool { + if or1.Identifier != or2.Identifier || + or1.Status != or2.Status || + or1.Message != or2.Message { + return false + } + + if or1.Error != nil && or2.Error != nil && or1.Error.Error() != or2.Error.Error() { + return false + } + if (or1.Error == nil && or2.Error != nil) || (or1.Error != nil && or2.Error == nil) { + return false + } + + if len(or1.GeneratedResources) != len(or2.GeneratedResources) { + return false + } + + for i := range or1.GeneratedResources { + if !DeepEqual(or1.GeneratedResources[i], or2.GeneratedResources[i]) { + return false + } + } + return true +} diff --git a/pkg/kstatus/observe/event/event_test.go b/pkg/kstatus/observe/event/event_test.go new file mode 100644 index 0000000..e7008cd --- /dev/null +++ b/pkg/kstatus/observe/event/event_test.go @@ -0,0 +1,273 @@ +package event + +import ( + "fmt" + "testing" + + "gotest.tools/assert" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/kstatus/wait" +) + +func TestDeepEqual(t *testing.T) { + testCases := map[string]struct { + actual ObservedResource + expected ObservedResource + equal bool + }{ + "same resource should be equal": { + actual: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Foo", + }, + Status: status.UnknownStatus, + Message: "Some message", + }, + expected: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Foo", + }, + Status: status.UnknownStatus, + Message: "Some message", + }, + equal: true, + }, + "different resources with only name different": { + actual: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Foo", + }, + Status: status.CurrentStatus, + }, + expected: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.CurrentStatus, + }, + equal: false, + }, + "different GroupKind otherwise same": { + actual: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.CurrentStatus, + }, + expected: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "custom.io", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.CurrentStatus, + }, + equal: false, + }, + "same resource with same error": { + actual: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.UnknownStatus, + Error: fmt.Errorf("this is a test"), + }, + expected: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.UnknownStatus, + Error: fmt.Errorf("this is a test"), + }, + equal: true, + }, + "same resource with different error": { + actual: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.UnknownStatus, + Error: fmt.Errorf("this is a test"), + }, + expected: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.UnknownStatus, + Error: fmt.Errorf("this is a different error"), + }, + equal: false, + }, + "same resource different status": { + actual: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.CurrentStatus, + }, + expected: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.InProgressStatus, + }, + equal: false, + }, + "same resource with different number of generated resources": { + actual: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.InProgressStatus, + GeneratedResources: []*ObservedResource{ + { + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "ReplicaSet", + }, + Namespace: "default", + Name: "Bar-123", + }, + Status: status.InProgressStatus, + }, + }, + }, + expected: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.InProgressStatus, + }, + equal: false, + }, + "same resource with different status on generated resources": { + actual: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.InProgressStatus, + GeneratedResources: []*ObservedResource{ + { + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "ReplicaSet", + }, + Namespace: "default", + Name: "Bar-123", + }, + Status: status.InProgressStatus, + }, + }, + }, + expected: ObservedResource{ + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Namespace: "default", + Name: "Bar", + }, + Status: status.InProgressStatus, + GeneratedResources: []*ObservedResource{ + { + Identifier: wait.ResourceIdentifier{ + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "ReplicaSet", + }, + Namespace: "default", + Name: "Bar-123", + }, + Status: status.CurrentStatus, + }, + }, + }, + equal: false, + }, + } + + for tn, tc := range testCases { + t.Run(tn, func(t *testing.T) { + res := DeepEqual(&tc.actual, &tc.expected) + + assert.Equal(t, tc.equal, res) + }) + } +} diff --git a/pkg/kstatus/observe/observer/aggregator.go b/pkg/kstatus/observe/observer/aggregator.go new file mode 100644 index 0000000..019d3b8 --- /dev/null +++ b/pkg/kstatus/observe/observer/aggregator.go @@ -0,0 +1,20 @@ +package observer + +import ( + "sigs.k8s.io/cli-utils/pkg/kstatus/observe/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" +) + +// StatusAggregator provides the interface the observer uses to compute the aggregate status. +// It also include a function that will be used by the observer to determine if all resources +// should be considered fully reconciled. +type StatusAggregator interface { + // ResourceObserved notifies the aggregator of a new observation. Called after status has been + // computed. + ResourceObserved(resource *event.ObservedResource) + // AggregateStatus computes the aggregate status for all the resources at the given + // point in time. + AggregateStatus() status.Status + // Completed returns true if all resources should be considered reconciled and false otherwise. + Completed() bool +} diff --git a/pkg/kstatus/observe/observer/observer.go b/pkg/kstatus/observe/observer/observer.go new file mode 100644 index 0000000..8f64c74 --- /dev/null +++ b/pkg/kstatus/observe/observer/observer.go @@ -0,0 +1,235 @@ +package observer + +import ( + "context" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/kstatus/observe/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/wait" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// AggregatorFactoryFunc defines the signature for the function the Observer will use to +// create a new StatusAggregator for each statusObserverRunner. +type AggregatorFactoryFunc func(identifiers []wait.ResourceIdentifier) StatusAggregator + +// ReaderFactoryFunc defines the signature for the function the Observer will use to create +// a new ObserverReader for each statusObserverRunner. +type ReaderFactoryFunc func(reader client.Reader, mapper meta.RESTMapper, + identifiers []wait.ResourceIdentifier) (ObserverReader, error) + +// ObserversFactoryFunc defines the signature for the function the Observer will use to +// create the resource observers and the default observer for each statusObserverRunner. +type ObserversFactoryFunc func(reader ObserverReader, mapper meta.RESTMapper) ( + resourceObservers map[schema.GroupKind]ResourceObserver, defaultObserver ResourceObserver) + +// Observer provides functionality for polling a cluster for status for a set of resources. +type Observer struct { + Reader client.Reader + Mapper meta.RESTMapper + + // AggregatorFunc provides the Observer with a way to create a new aggregator. This will + // happen for every call to Observe since each statusObserverRunner keeps its own + // status aggregator. + AggregatorFactoryFunc AggregatorFactoryFunc + + // ReaderFactoryFunc provides the Observer with a factory function for creating new + // ObserverReaders. Since these can be stateful, every call to Observe will create a new + // ObserverReader. + ReaderFactoryFunc ReaderFactoryFunc + + // ObserversFactoryFunc provides the Observer with a factory function for creating resource + // observers. Each statusObserverRunner has a separate set of observers, so this will be called + // for every call to Observe. + ObserversFactoryFunc ObserversFactoryFunc +} + +// Observe will create a new statusObserverRunner that will poll all the resources provided and report their status +// back on the event channel returned. The statusObserverRunner can be cancelled at any time by cancelling the +// context passed in. +// If observeUntilCancelled is set to false, then the runner will stop observing the resources when the StatusAggregator +// determines that all resources has been fully reconciled. If this is set to true, the observer will keep running +// until cancelled. This can be useful if the goal is to just monitor a set of resources rather than waiting for +// all to reach a specific status. +// The pollInterval specifies how often the observer should poll the cluster for the latest state of the resources. +func (s *Observer) Observe(ctx context.Context, identifiers []wait.ResourceIdentifier, pollInterval time.Duration, + observeUntilCancelled bool) <-chan event.Event { + eventChannel := make(chan event.Event) + + go func() { + defer close(eventChannel) + + observerReader, err := s.ReaderFactoryFunc(s.Reader, s.Mapper, identifiers) + if err != nil { + eventChannel <- event.Event{ + EventType: event.ErrorEvent, + Error: err, + } + return + } + observers, defaultObserver := s.ObserversFactoryFunc(observerReader, s.Mapper) + aggregator := s.AggregatorFactoryFunc(identifiers) + + runner := &statusObserverRunner{ + ctx: ctx, + reader: observerReader, + observers: observers, + defaultObserver: defaultObserver, + identifiers: identifiers, + previousObservedResources: make(map[wait.ResourceIdentifier]*event.ObservedResource), + eventChannel: eventChannel, + statusAggregator: aggregator, + observeUntilCancelled: observeUntilCancelled, + pollingInterval: pollInterval, + } + runner.Run() + }() + + return eventChannel +} + +// statusObserverRunner is responsible for polling of a set of resources. Each call to Observe will create +// a new statusObserverRunner, which means we can keep state in the runner and all data will only be accessed +// by a single goroutine, meaning we don't need synchronization. +// The statusObserverRunner uses an implementation of the ObserverReader interface to talk to the +// kubernetes cluster. Currently this can be either the cached ObserverReader that syncs all needed resources +// with LIST calls before each polling loop, or the normal ObserverReader that just forwards each call +// to the client.Reader from controller-runtime. +type statusObserverRunner struct { + // ctx is the context for the runner. It will be used by the caller of Observe to cancel + // observing resources. + ctx context.Context + + // reader is the interface for fetching and listing resources from the cluster. It can be implemented + // to make call directly to the cluster or use caching to reduce the number of calls to the cluster. + reader ObserverReader + + // observers contains the resource specific observers. These will contain logic for how to + // compute status for specific GroupKinds. These will use an ObserverReader to fetch + // status of a resource and any generated resources. + observers map[schema.GroupKind]ResourceObserver + + // defaultObserver is the generic observer that is used for all GroupKinds that + // doesn't have a specific observer in the observers map. + defaultObserver ResourceObserver + + // identifiers contains the list of identifiers for the resources that should be observed. + // Each resource is identified by GroupKind, namespace and name. + identifiers []wait.ResourceIdentifier + + // previousObservedResources keeps track of the last event for each + // of the observed resources. This is used to make sure we only + // send events on the event channel when something has actually changed. + previousObservedResources map[wait.ResourceIdentifier]*event.ObservedResource + + // eventChannel is a channel where any updates to the observed status of resources + // will be sent. The caller of Observe will listen for updates. + eventChannel chan event.Event + + // statusAggregator is responsible for keeping track of the status of + // all of the observed resources and to compute the aggregate status. + statusAggregator StatusAggregator + + // observeUntilCancelled decides whether the runner should keep running + // even if the statusAggregator decides that all resources has reached the + // desired status. + observeUntilCancelled bool + + // pollingInterval determines how often we should poll the cluster for + // the latest state of resources. + pollingInterval time.Duration +} + +// Run starts the polling loop of the observers. +func (r *statusObserverRunner) Run() { + // Sets up ticker that will trigger the regular polling loop at a regular interval. + ticker := time.NewTicker(r.pollingInterval) + defer func() { + ticker.Stop() + }() + + for { + select { + case <-r.ctx.Done(): + // If the context has been cancelled, just send an AbortedEvent + // and pass along the most up-to-date aggregate status. Then return + // from this function, which will stop the ticker and close the event channel. + aggregatedStatus := r.statusAggregator.AggregateStatus() + r.eventChannel <- event.Event{ + EventType: event.AbortedEvent, + AggregateStatus: aggregatedStatus, + } + return + case <-ticker.C: + // First trigger a sync of the ObserverReader. This may or may not actually + // result in calls to the cluster, depending on the implementation. + // If this call fails, there is no clean way to recover, so we just return an ErrorEvent + // and shut down. + err := r.reader.Sync(r.ctx) + if err != nil { + r.eventChannel <- event.Event{ + EventType: event.ErrorEvent, + Error: err, + } + return + } + // Poll all resources and compute status. If the polling of resources has completed (based + // on information from the StatusAggregator and the value of observeUntilCancelled), we send + // a CompletedEvent and return. + completed := r.observeStatusForAllResources() + if completed { + aggregatedStatus := r.statusAggregator.AggregateStatus() + r.eventChannel <- event.Event{ + EventType: event.CompletedEvent, + AggregateStatus: aggregatedStatus, + } + return + } + } + } +} + +// observeStatusForAllResources iterates over all the resources in the set and delegates +// to the appropriate observer to compute the status. +func (r *statusObserverRunner) observeStatusForAllResources() bool { + for _, id := range r.identifiers { + gk := id.GroupKind + observer := r.observerForGroupKind(gk) + observedResource := observer.Observe(r.ctx, id) + r.statusAggregator.ResourceObserved(observedResource) + if r.isUpdatedObservedResource(observedResource) { + r.previousObservedResources[id] = observedResource + aggregatedStatus := r.statusAggregator.AggregateStatus() + r.eventChannel <- event.Event{ + EventType: event.ResourceUpdateEvent, + AggregateStatus: aggregatedStatus, + Resource: observedResource, + } + if r.statusAggregator.Completed() && !r.observeUntilCancelled { + return true + } + } + } + if r.statusAggregator.Completed() && !r.observeUntilCancelled { + return true + } + return false +} + +func (r *statusObserverRunner) observerForGroupKind(gk schema.GroupKind) ResourceObserver { + observer, ok := r.observers[gk] + if !ok { + return r.defaultObserver + } + return observer +} + +func (r *statusObserverRunner) isUpdatedObservedResource(observedResource *event.ObservedResource) bool { + oldObservedResource, found := r.previousObservedResources[observedResource.Identifier] + if !found { + return true + } + return !event.DeepEqual(observedResource, oldObservedResource) +} diff --git a/pkg/kstatus/observe/observer/observer_test.go b/pkg/kstatus/observe/observer/observer_test.go new file mode 100644 index 0000000..d10eef6 --- /dev/null +++ b/pkg/kstatus/observe/observer/observer_test.go @@ -0,0 +1,227 @@ +package observer + +import ( + "context" + "testing" + "time" + + "gotest.tools/assert" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/kstatus/observe/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/observe/testutil" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/kstatus/wait" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestStatusObserverRunner(t *testing.T) { + testCases := map[string]struct { + identifiers []wait.ResourceIdentifier + defaultObserver ResourceObserver + expectedEventTypes []event.EventType + }{ + "no resources": { + identifiers: []wait.ResourceIdentifier{}, + expectedEventTypes: []event.EventType{event.CompletedEvent}, + }, + "single resource": { + identifiers: []wait.ResourceIdentifier{ + { + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Name: "foo", + Namespace: "bar", + }, + }, + defaultObserver: &fakeObserver{ + resourceObservations: map[schema.GroupKind][]status.Status{ + schema.GroupKind{Group: "apps", Kind: "Deployment"}: { //nolint:gofmt + status.InProgressStatus, + status.CurrentStatus, + }, + }, + resourceObservationCount: make(map[schema.GroupKind]int), + }, + expectedEventTypes: []event.EventType{ + event.ResourceUpdateEvent, + event.ResourceUpdateEvent, + event.CompletedEvent, + }, + }, + "multiple resources": { + identifiers: []wait.ResourceIdentifier{ + { + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Name: "foo", + Namespace: "default", + }, + { + GroupKind: schema.GroupKind{ + Group: "", + Kind: "Service", + }, + Name: "bar", + Namespace: "default", + }, + }, + defaultObserver: &fakeObserver{ + resourceObservations: map[schema.GroupKind][]status.Status{ + schema.GroupKind{Group: "apps", Kind: "Deployment"}: { //nolint:gofmt + status.InProgressStatus, + status.CurrentStatus, + }, + schema.GroupKind{Group: "", Kind: "Service"}: { //nolint:gofmt + status.InProgressStatus, + status.InProgressStatus, + status.CurrentStatus, + }, + }, + resourceObservationCount: make(map[schema.GroupKind]int), + }, + expectedEventTypes: []event.EventType{ + event.ResourceUpdateEvent, + event.ResourceUpdateEvent, + event.ResourceUpdateEvent, + event.ResourceUpdateEvent, + event.CompletedEvent, + }, + }, + } + + for tn, tc := range testCases { + t.Run(tn, func(t *testing.T) { + ctx := context.Background() + + identifiers := tc.identifiers + + observer := Observer{ + AggregatorFactoryFunc: func(identifiers []wait.ResourceIdentifier) StatusAggregator { + return newFakeAggregator(identifiers) + }, + ReaderFactoryFunc: func(_ client.Reader, _ meta.RESTMapper, _ []wait.ResourceIdentifier) (ObserverReader, error) { + return testutil.NewNoopObserverReader(), nil + }, + ObserversFactoryFunc: func(_ ObserverReader, _ meta.RESTMapper) (resourceObservers map[schema.GroupKind]ResourceObserver, defaultObserver ResourceObserver) { + return make(map[schema.GroupKind]ResourceObserver), tc.defaultObserver + }, + } + + eventChannel := observer.Observe(ctx, identifiers, 2*time.Second, false) + + var eventTypes []event.EventType + for ch := range eventChannel { + eventTypes = append(eventTypes, ch.EventType) + } + + assert.DeepEqual(t, tc.expectedEventTypes, eventTypes) + }) + } +} + +func TestNewStatusObserverRunnerCancellation(t *testing.T) { + identifiers := make([]wait.ResourceIdentifier, 0) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + timer := time.NewTimer(5 * time.Second) + + observer := Observer{ + AggregatorFactoryFunc: func(identifiers []wait.ResourceIdentifier) StatusAggregator { + return newFakeAggregator(identifiers) + //return aggregator.NewAllCurrentOrNotFoundStatusAggregator(identifiers) + }, + ReaderFactoryFunc: func(_ client.Reader, _ meta.RESTMapper, _ []wait.ResourceIdentifier) (ObserverReader, error) { + return testutil.NewNoopObserverReader(), nil + }, + ObserversFactoryFunc: func(_ ObserverReader, _ meta.RESTMapper) (resourceObservers map[schema.GroupKind]ResourceObserver, defaultObserver ResourceObserver) { + return make(map[schema.GroupKind]ResourceObserver), nil + }, + } + + eventChannel := observer.Observe(ctx, identifiers, 2*time.Second, true) + + var lastEvent event.Event + for { + select { + case e, more := <-eventChannel: + timer.Stop() + if more { + lastEvent = e + } else { + if want, got := event.AbortedEvent, lastEvent.EventType; got != want { + t.Errorf("Expected e to have type %s, but got %s", want, got) + } + return + } + case <-timer.C: + t.Errorf("expected runner to time out, but it didn't") + return + } + } +} + +type fakeObserver struct { + resourceObservations map[schema.GroupKind][]status.Status + resourceObservationCount map[schema.GroupKind]int +} + +func (f *fakeObserver) Observe(_ context.Context, identifier wait.ResourceIdentifier) *event.ObservedResource { + count := f.resourceObservationCount[identifier.GroupKind] + observedResourceStatusSlice := f.resourceObservations[identifier.GroupKind] + var observedResourceStatus status.Status + if len(observedResourceStatusSlice) > count { + observedResourceStatus = observedResourceStatusSlice[count] + } else { + observedResourceStatus = observedResourceStatusSlice[len(observedResourceStatusSlice)-1] + } + f.resourceObservationCount[identifier.GroupKind] = count + 1 + return &event.ObservedResource{ + Identifier: identifier, + Status: observedResourceStatus, + } +} + +func (f *fakeObserver) ObserveObject(_ context.Context, _ *unstructured.Unstructured) *event.ObservedResource { + return nil +} + +func (f *fakeObserver) SetComputeStatusFunc(_ ComputeStatusFunc) {} + +func newFakeAggregator(identifiers []wait.ResourceIdentifier) *fakeAggregator { + statuses := make(map[wait.ResourceIdentifier]status.Status) + for _, id := range identifiers { + statuses[id] = status.UnknownStatus + } + return &fakeAggregator{ + statuses: statuses, + } +} + +type fakeAggregator struct { + statuses map[wait.ResourceIdentifier]status.Status +} + +func (f *fakeAggregator) ResourceObserved(resource *event.ObservedResource) { + f.statuses[resource.Identifier] = resource.Status +} + +func (f *fakeAggregator) AggregateStatus() status.Status { + for _, s := range f.statuses { + if s != status.CurrentStatus { + return status.InProgressStatus + } + } + return status.CurrentStatus +} + +func (f *fakeAggregator) Completed() bool { + return f.AggregateStatus() == status.CurrentStatus +} diff --git a/pkg/kstatus/observe/observer/reader.go b/pkg/kstatus/observe/observer/reader.go new file mode 100644 index 0000000..e0b9dc8 --- /dev/null +++ b/pkg/kstatus/observe/observer/reader.go @@ -0,0 +1,25 @@ +package observer + +import ( + "context" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// ObserverReader is the interface provided to the observers to talk to the cluster. +type ObserverReader interface { + // Get looks up the resource identifier by the key and the GVK in the provided obj reference. If something + // goes wrong or the resource doesn't exist, an error is returned. + Get(ctx context.Context, key client.ObjectKey, obj *unstructured.Unstructured) error + // ListNamespaceScoped looks up the resources of the GVK given in the list and matches the namespace and + // selector provided. + ListNamespaceScoped(ctx context.Context, list *unstructured.UnstructuredList, namespace string, selector labels.Selector) error + // ListClusterScoped looks up the resources of the GVK given in the list and that matches the selector + // provided. + ListClusterScoped(ctx context.Context, list *unstructured.UnstructuredList, selector labels.Selector) error + // Sync is called by the observer before every polling loop, which provides an opportunity for the Reader + // to sync caches. + Sync(ctx context.Context) error +} diff --git a/pkg/kstatus/observe/observer/resourceobserver.go b/pkg/kstatus/observe/observer/resourceobserver.go new file mode 100644 index 0000000..728f4c5 --- /dev/null +++ b/pkg/kstatus/observe/observer/resourceobserver.go @@ -0,0 +1,38 @@ +package observer + +import ( + "context" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "sigs.k8s.io/cli-utils/pkg/kstatus/observe/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/kstatus/wait" +) + +// ComputeStatusFunc is the function that the observer will delegate to for +// computing status of the resource. This is set to use the status library +// by default and changing it is probably only useful for testing. +type ComputeStatusFunc func(u *unstructured.Unstructured) (*status.Result, error) + +// ResourceObserver is the main interface for observers. In this context, +// an observer is an object that can fetch a resource of a specific +// GroupKind from the cluster and compute its status. For resources that +// can own generated resources, the observer might also have knowledge about +// how to identify these generated resources and how to compute status for +// these generated resources. +type ResourceObserver interface { + // Observe will fetch the resource identified by the given identifier + // from the cluster and return an ObservedResource that will contain + // information about the latest state of the resource, its computed status + // and information about any generated resources. + Observe(ctx context.Context, resource wait.ResourceIdentifier) *event.ObservedResource + + // ObserveObject is similar to Observe, but instead of looking up the + // resource based on an identifier, it will use the passed in resource. + ObserveObject(ctx context.Context, object *unstructured.Unstructured) *event.ObservedResource + + // SetComputeStatusFunc can be used to set the function invoked by + // the observer to compute the status of a resource. By default this + // is set to use the status library. This is only used for testing. + SetComputeStatusFunc(statusFunc ComputeStatusFunc) +} diff --git a/pkg/kstatus/observe/testutil/testing.go b/pkg/kstatus/observe/testutil/testing.go new file mode 100644 index 0000000..a703aee --- /dev/null +++ b/pkg/kstatus/observe/testutil/testing.go @@ -0,0 +1,57 @@ +package testutil + +import ( + "context" + "testing" + + "gopkg.in/yaml.v3" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func YamlToUnstructured(t *testing.T, yml string) *unstructured.Unstructured { + m := make(map[string]interface{}) + err := yaml.Unmarshal([]byte(yml), &m) + if err != nil { + t.Fatalf("error parsing yaml: %v", err) + return nil + } + return &unstructured.Unstructured{Object: m} +} + +func NewFakeRESTMapper(gvks ...schema.GroupVersionKind) meta.RESTMapper { + var groupVersions []schema.GroupVersion + for _, gvk := range gvks { + groupVersions = append(groupVersions, gvk.GroupVersion()) + } + mapper := meta.NewDefaultRESTMapper(groupVersions) + for _, gvk := range gvks { + mapper.Add(gvk, meta.RESTScopeNamespace) + } + return mapper +} + +func NewNoopObserverReader() *NoopObserverReader { + return &NoopObserverReader{} +} + +type NoopObserverReader struct{} + +func (n *NoopObserverReader) Get(_ context.Context, _ client.ObjectKey, _ *unstructured.Unstructured) error { + return nil +} + +func (n *NoopObserverReader) ListNamespaceScoped(_ context.Context, _ *unstructured.UnstructuredList, _ string, _ labels.Selector) error { + return nil +} + +func (n *NoopObserverReader) ListClusterScoped(_ context.Context, _ *unstructured.UnstructuredList, _ labels.Selector) error { + return nil +} + +func (n *NoopObserverReader) Sync(_ context.Context) error { + return nil +} diff --git a/pkg/kstatus/status/status.go b/pkg/kstatus/status/status.go index 51fec7f..63f9126 100644 --- a/pkg/kstatus/status/status.go +++ b/pkg/kstatus/status/status.go @@ -24,6 +24,7 @@ const ( FailedStatus Status = "Failed" CurrentStatus Status = "Current" TerminatingStatus Status = "Terminating" + NotFoundStatus Status = "NotFound" UnknownStatus Status = "Unknown" )