diff --git a/cmd/printers/printers.go b/cmd/printers/printers.go index 821e782..acc9f9f 100644 --- a/cmd/printers/printers.go +++ b/cmd/printers/printers.go @@ -6,15 +6,21 @@ package printers import ( "k8s.io/cli-runtime/pkg/genericclioptions" "sigs.k8s.io/cli-utils/cmd/printers/printer" + "sigs.k8s.io/cli-utils/cmd/printers/table" "sigs.k8s.io/cli-utils/pkg/apply" ) const ( EventsPrinter = "events" + TablePrinter = "table" ) func GetPrinter(printerType string, ioStreams genericclioptions.IOStreams) printer.Printer { switch printerType { //nolint:gocritic + case TablePrinter: + return &table.Printer{ + IOStreams: ioStreams, + } default: return &apply.BasicPrinter{ IOStreams: ioStreams, @@ -23,7 +29,7 @@ func GetPrinter(printerType string, ioStreams genericclioptions.IOStreams) print } func SupportedPrinters() []string { - return []string{EventsPrinter} + return []string{EventsPrinter, TablePrinter} } func DefaultPrinter() string { diff --git a/cmd/printers/table/collector.go b/cmd/printers/table/collector.go new file mode 100644 index 0000000..bbfc63c --- /dev/null +++ b/cmd/printers/table/collector.go @@ -0,0 +1,264 @@ +// Copyright 2020 The Kubernetes Authors. +// SPDX-License-Identifier: Apache-2.0 + +package table + +import ( + "sort" + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/cli-utils/pkg/apply/event" + pe "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/object" + "sigs.k8s.io/cli-utils/pkg/print/table" +) + +func newResourceStateCollector(resourceGroups []event.ResourceGroup) *ResourceStateCollector { + resourceInfos := make(map[object.ObjMetadata]*ResourceInfo) + for _, group := range resourceGroups { + action := group.Action + for _, identifier := range group.Identifiers { + resourceInfos[identifier] = &ResourceInfo{ + identifier: identifier, + resourceStatus: &pe.ResourceStatus{ + Identifier: identifier, + Status: status.UnknownStatus, + }, + ResourceAction: action, + } + } + } + return &ResourceStateCollector{ + resourceInfos: resourceInfos, + } +} + +// ResourceStateCollector consumes the events from the applier +// eventChannel and keeps track of the latest state for all resources. +// It also provides functionality for fetching the latest seen +// state and return it in format that can be used by the +// BaseTablePrinter. +type ResourceStateCollector struct { + mux sync.RWMutex + + // resourceInfos contains a mapping from the unique + // resource identifier to a ResourceInfo object that captures + // the latest state for the given resource. + resourceInfos map[object.ObjMetadata]*ResourceInfo +} + +// ResourceInfo captures the latest seen state of a single resource. +// This is used for top-level resources that have a ResourceAction +// associated with them. +type ResourceInfo struct { + // identifier contains the information that identifies a + // single resource. + identifier object.ObjMetadata + + // resourceStatus contains the latest status information + // about the resource. + resourceStatus *pe.ResourceStatus + + // ResourceAction defines the action we are performing + // on this particular resource. This can be either Apply + // or Prune. + ResourceAction event.ResourceAction + + // ApplyOpResult contains the result after + // a resource has been applied to the cluster. + ApplyOpResult *event.ApplyEventOperation +} + +// Identifier returns the identifier for the given resource. +func (r *ResourceInfo) Identifier() object.ObjMetadata { + return r.identifier +} + +// ResourceStatus returns the latest seen status for the +// resource. +func (r *ResourceInfo) ResourceStatus() *pe.ResourceStatus { + return r.resourceStatus +} + +// SubResources returns a slice of Resource which contains +// any resources created and managed by this resource. +func (r *ResourceInfo) SubResources() []table.Resource { + var resources []table.Resource + for _, res := range r.resourceStatus.GeneratedResources { + resources = append(resources, &SubResourceInfo{ + resourceStatus: res, + }) + } + return resources +} + +// SubResourceInfo captures the latest seen state of a +// single subResource, i.e. resources that are created and +// managed by one of the top-level resources we either apply +// or prune. +type SubResourceInfo struct { + // resourceStatus contains the latest status information + // about the subResource. + resourceStatus *pe.ResourceStatus +} + +// Identifier returns the identifier for the given subResource. +func (r *SubResourceInfo) Identifier() object.ObjMetadata { + return r.resourceStatus.Identifier +} + +// ResourceStatus returns the latest seen status for the +// subResource. +func (r *SubResourceInfo) ResourceStatus() *pe.ResourceStatus { + return r.resourceStatus +} + +// SubResources returns a slice of Resource which contains +// any resources created and managed by this resource. +func (r *SubResourceInfo) SubResources() []table.Resource { + var resources []table.Resource + for _, res := range r.resourceStatus.GeneratedResources { + resources = append(resources, &SubResourceInfo{ + resourceStatus: res, + }) + } + return resources +} + +// Listen starts a new goroutine that will listen for events on the +// provided eventChannel and keep track of the latest state for +// the resources. The goroutine will exit when the provided +// eventChannel is closed. +// The function returns a channel. When this channel is closed, the +// goroutine has processed all events in the eventChannel and +// exited. +func (r *ResourceStateCollector) Listen(eventChannel <-chan event.Event) <-chan struct{} { + completed := make(chan struct{}) + go func() { + defer close(completed) + for e := range eventChannel { + r.processEvent(e) + } + }() + return completed +} + +// processEvent processes an event and updates the state. +func (r *ResourceStateCollector) processEvent(e event.Event) { + r.mux.Lock() + defer r.mux.Unlock() + switch e.Type { + case event.StatusType: + r.processStatusEvent(e.StatusEvent) + case event.ApplyType: + r.processApplyEvent(e.ApplyEvent) + case event.ErrorType: + r.processErrorEvent(e.ErrorEvent.Err) + } +} + +// processStatusEvent handles events pertaining to a status +// update for a resource. +func (r *ResourceStateCollector) processStatusEvent(e pe.Event) { + if e.EventType == pe.ErrorEvent { + r.processErrorEvent(e.Error) + return + } + if e.EventType == pe.ResourceUpdateEvent { + resource := e.Resource + previous := r.resourceInfos[resource.Identifier] + previous.resourceStatus = e.Resource + } +} + +// processApplyEvent handles events relating to apply operations +func (r *ResourceStateCollector) processApplyEvent(e event.ApplyEvent) { + if e.Type == event.ApplyEventResourceUpdate { + identifier := toIdentifier(e.Object) + previous := r.resourceInfos[identifier] + previous.ApplyOpResult = &e.Operation + } +} + +// processErrorEvent handles events for errors. +func (r *ResourceStateCollector) processErrorEvent(err error) { + // TODO: Handle errors more gracefully than this. + panic(err) +} + +// toIdentifier extracts the identifying information from an +// object. +func toIdentifier(o runtime.Object) object.ObjMetadata { + accessor, _ := meta.Accessor(o) + return object.ObjMetadata{ + GroupKind: o.GetObjectKind().GroupVersionKind().GroupKind(), + Namespace: accessor.GetNamespace(), + Name: accessor.GetName(), + } +} + +// ResourceState contains the latest state for all the resources. +type ResourceState struct { + ResourceInfos ResourceInfos +} + +// Resources returns a slice containing the latest state +// for each individual resource. +func (r *ResourceState) Resources() []table.Resource { + var resources []table.Resource + for _, res := range r.ResourceInfos { + resources = append(resources, res) + } + return resources +} + +// LatestState returns a ResourceState object that contains +// a copy of the latest state for all resources. +func (r *ResourceStateCollector) LatestState() *ResourceState { + r.mux.RLock() + defer r.mux.RUnlock() + + var resourceInfos ResourceInfos + for _, ri := range r.resourceInfos { + resourceInfos = append(resourceInfos, &ResourceInfo{ + identifier: ri.identifier, + resourceStatus: ri.resourceStatus, + ResourceAction: ri.ResourceAction, + ApplyOpResult: ri.ApplyOpResult, + }) + } + sort.Sort(resourceInfos) + + return &ResourceState{ + ResourceInfos: resourceInfos, + } +} + +type ResourceInfos []*ResourceInfo + +func (g ResourceInfos) Len() int { + return len(g) +} + +func (g ResourceInfos) Less(i, j int) bool { + idI := g[i].identifier + idJ := g[j].identifier + + if idI.Namespace != idJ.Namespace { + return idI.Namespace < idJ.Namespace + } + if idI.GroupKind.Group != idJ.GroupKind.Group { + return idI.GroupKind.Group < idJ.GroupKind.Group + } + if idI.GroupKind.Kind != idJ.GroupKind.Kind { + return idI.GroupKind.Kind < idJ.GroupKind.Kind + } + return idI.Name < idJ.Name +} + +func (g ResourceInfos) Swap(i, j int) { + g[i], g[j] = g[j], g[i] +} diff --git a/cmd/printers/table/printer.go b/cmd/printers/table/printer.go new file mode 100644 index 0000000..41251d5 --- /dev/null +++ b/cmd/printers/table/printer.go @@ -0,0 +1,134 @@ +// Copyright 2020 The Kubernetes Authors. +// SPDX-License-Identifier: Apache-2.0 + +package table + +import ( + "fmt" + "io" + "time" + + "k8s.io/cli-runtime/pkg/genericclioptions" + "sigs.k8s.io/cli-utils/pkg/apply/event" + "sigs.k8s.io/cli-utils/pkg/print/table" +) + +type Printer struct { + IOStreams genericclioptions.IOStreams +} + +func (t *Printer) Print(ch <-chan event.Event, _ bool) { + // Wait for the init event that will give us the set of + // resources. + var initEvent event.InitEvent + for e := range ch { + if e.Type == event.InitType { + initEvent = e.InitEvent + break + } + // If we get an error event, we just panic for now. + // Eventually we need a more graceful shutdown if + // this happens. + if e.Type == event.ErrorType { + panic(e.ErrorEvent.Err) + } + } + // Create a new collector and initialize it with the resources + // we are interested in. + coll := newResourceStateCollector(initEvent.ResourceGroups) + + stop := make(chan struct{}) + + // Start the goroutine that is responsible for + // printing the latest state on a regular cadence. + printCompleted := t.runPrintLoop(coll, stop) + + // Make the collector start listening on the eventChannel. + done := coll.Listen(ch) + + // Block until all the collector has shut down. This means the + // eventChannel has been closed and all events have been processed. + <-done + + // Close the stop channel to notify the print goroutine that it should + // shut down. + close(stop) + + // Wait until the printCompleted channel is closed. This means + // the printer has updated the UI with the latest state and + // exited from the goroutine. + <-printCompleted +} + +// columns defines the columns we want to print +//TODO: We should have the number of columns and their widths be +// dependent on the space available. +var columns = []table.ColumnDefinition{ + table.MustColumn("namespace"), + table.MustColumn("resource"), + table.ColumnDef{ + // Column containing the resource type and name. Currently it does not + // print group or version since those are rarely needed to uniquely + // distinguish two resources from each other. Just name and kind should + // be enough in almost all cases and saves space in the output. + ColumnName: "action", + ColumnHeader: "ACTION", + ColumnWidth: 12, + PrintResourceFunc: func(w io.Writer, width int, r table.Resource) (int, + error) { + var resInfo *ResourceInfo + switch res := r.(type) { + case *ResourceInfo: + resInfo = res + case *SubResourceInfo: + return 0, nil + } + + if resInfo.ResourceAction == event.ApplyAction && + resInfo.ApplyOpResult != nil { + text := resInfo.ApplyOpResult.String() + if len(text) > width { + text = text[:width] + } + _, err := fmt.Fprint(w, text) + return len(text), err + } + return 0, nil + }, + }, + table.MustColumn("status"), + table.MustColumn("conditions"), + table.MustColumn("age"), + table.MustColumn("message"), +} + +// runPrintLoop starts a new goroutine that will regularly fetch the +// latest state from the collector and update the table. +func (t *Printer) runPrintLoop(coll *ResourceStateCollector, stop chan struct{}) chan struct{} { + finished := make(chan struct{}) + + baseTablePrinter := table.BaseTablePrinter{ + IOStreams: t.IOStreams, + Columns: columns, + } + + linesPrinted := baseTablePrinter.PrintTable(coll.LatestState(), 0) + + go func() { + defer close(finished) + ticker := time.NewTicker(500 * time.Millisecond) + for { + select { + case <-stop: + ticker.Stop() + latestState := coll.LatestState() + linesPrinted = baseTablePrinter.PrintTable(latestState, linesPrinted) + return + case <-ticker.C: + latestState := coll.LatestState() + linesPrinted = baseTablePrinter.PrintTable(latestState, linesPrinted) + } + } + }() + return finished +}