Merge pull request #32 from mortent/PruneChannel

Use channel directly in prune instead of ToPrinter
This commit is contained in:
Jeff Regan 2020-02-11 09:39:43 -08:00 committed by GitHub
commit 6c36bffc99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 113 additions and 93 deletions

View File

@ -10,12 +10,12 @@ import (
"github.com/go-errors/errors"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/kubectl/pkg/cmd/apply"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/prune"
"sigs.k8s.io/cli-utils/pkg/kstatus/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -124,8 +124,8 @@ func (a *Applier) newResolver(pollInterval time.Duration) (*wait.Resolver, error
// before all the given resources have been applied to the cluster. Any
// cancellation or timeout will only affect how long we wait for the
// resources to become current.
func (a *Applier) Run(ctx context.Context) <-chan Event {
ch := make(chan Event)
func (a *Applier) Run(ctx context.Context) <-chan event.Event {
ch := make(chan event.Event)
go func() {
defer close(ch)
@ -135,7 +135,6 @@ func (a *Applier) Run(ctx context.Context) <-chan Event {
// The adapter is used to intercept what is meant to be printing
// in the ApplyOptions, and instead turn those into events.
a.ApplyOptions.ToPrinter = adapter.toPrinterFunc()
a.PruneOptions.ToPrinter = adapter.toPrinterFunc()
// This provides us with a slice of all the objects that will be
// applied to the cluster.
infos, _ := a.ApplyOptions.GetObjects()
@ -144,9 +143,9 @@ func (a *Applier) Run(ctx context.Context) <-chan Event {
// If we see an error here we just report it on the channel and then
// give up. Eventually we might be able to determine which errors
// are fatal and which might allow us to continue.
ch <- Event{
EventType: ErrorEventType,
ErrorEvent: ErrorEvent{
ch <- event.Event{
Type: event.ErrorEventType,
ErrorEvent: event.ErrorEvent{
Err: errors.WrapPrefix(err, "error applying resources", 1),
},
}
@ -158,22 +157,22 @@ func (a *Applier) Run(ctx context.Context) <-chan Event {
// As long as the statusChannel remains open, we take every statusEvent,
// wrap it in an Event and send it on the channel.
for statusEvent := range statusChannel {
ch <- Event{
EventType: StatusEventType,
ch <- event.Event{
Type: event.StatusEventType,
StatusEvent: statusEvent,
}
}
}
if !a.NoPrune {
err = a.PruneOptions.Prune(infos)
err = a.PruneOptions.Prune(infos, ch)
if err != nil {
// If we see an error here we just report it on the channel and then
// give up. Eventually we might be able to determine which errors
// are fatal and which might allow us to continue.
ch <- Event{
EventType: ErrorEventType,
ErrorEvent: ErrorEvent{
ch <- event.Event{
Type: event.ErrorEventType,
ErrorEvent: event.ErrorEvent{
Err: errors.WrapPrefix(err, "error pruning resources", 1),
},
}
@ -192,41 +191,3 @@ func infosToObjects(infos []*resource.Info) []wait.KubernetesObject {
}
return objects
}
// EventType determines the type of events that are available.
type EventType string
const (
ErrorEventType EventType = "error"
ApplyEventType EventType = "apply"
StatusEventType EventType = "status"
)
// Event is the type of the objects that will be returned through
// the channel that is returned from a call to Run. It contains
// information about progress and errors encountered during
// the process of doing apply, waiting for status and doing a prune.
type Event struct {
// EventType is the type of event.
EventType EventType
// ErrorEvent contains information about any errors encountered.
ErrorEvent ErrorEvent
// ApplyEvent contains information about progress pertaining to
// applying a resource to the cluster.
ApplyEvent ApplyEvent
// StatusEvents contains information about the status of one of
// the applied resources.
StatusEvent wait.Event
}
type ErrorEvent struct {
Err error
}
type ApplyEvent struct {
Operation string
Object runtime.Object
}

View File

@ -7,11 +7,12 @@ import (
"fmt"
"strings"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/genericclioptions"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/wait"
)
@ -26,23 +27,18 @@ type BasicPrinter struct {
// format on StdOut. As we support other printer implementations
// this should probably be an interface.
// This function will block until the channel is closed.
func (b *BasicPrinter) Print(ch <-chan Event) {
for event := range ch {
switch event.EventType {
case ErrorEventType:
cmdutil.CheckErr(event.ErrorEvent.Err)
case ApplyEventType:
obj := event.ApplyEvent.Object
func (b *BasicPrinter) Print(ch <-chan event.Event) {
for e := range ch {
switch e.Type {
case event.ErrorEventType:
cmdutil.CheckErr(e.ErrorEvent.Err)
case event.ApplyEventType:
obj := e.ApplyEvent.Object
gvk := obj.GetObjectKind().GroupVersionKind()
name := "<unknown>"
if acc, err := meta.Accessor(obj); err == nil {
if n := acc.GetName(); len(n) > 0 {
name = n
}
}
fmt.Fprintf(b.IOStreams.Out, "%s %s\n", resourceIDToString(gvk.GroupKind(), name), event.ApplyEvent.Operation)
case StatusEventType:
statusEvent := event.StatusEvent
name := getName(obj)
fmt.Fprintf(b.IOStreams.Out, "%s %s\n", resourceIDToString(gvk.GroupKind(), name), e.ApplyEvent.Operation)
case event.StatusEventType:
statusEvent := e.StatusEvent
switch statusEvent.Type {
case wait.ResourceUpdate:
id := statusEvent.EventResource.ResourceIdentifier
@ -53,10 +49,24 @@ func (b *BasicPrinter) Print(ch <-chan Event) {
case wait.Aborted:
fmt.Fprintf(b.IOStreams.Out, "resources failed to the reached Current status\n")
}
case event.PruneEventType:
obj := e.PruneEvent.Object
gvk := obj.GetObjectKind().GroupVersionKind()
name := getName(obj)
fmt.Fprintf(b.IOStreams.Out, "%s %s\n", resourceIDToString(gvk.GroupKind(), name), "pruned")
}
}
}
func getName(obj runtime.Object) string {
if acc, err := meta.Accessor(obj); err == nil {
if n := acc.GetName(); len(n) > 0 {
return n
}
}
return "<unknown>"
}
// resourceIDToString returns the string representation of a GroupKind and a resource name.
func resourceIDToString(gk schema.GroupKind, name string) string {
return fmt.Sprintf("%s/%s", strings.ToLower(gk.String()), name)

53
pkg/apply/event/event.go Normal file
View File

@ -0,0 +1,53 @@
package event
import (
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/cli-utils/pkg/kstatus/wait"
)
// Type determines the type of events that are available.
type Type string
const (
ErrorEventType Type = "error"
ApplyEventType Type = "apply"
StatusEventType Type = "status"
PruneEventType Type = "prune"
)
// Event is the type of the objects that will be returned through
// the channel that is returned from a call to Run. It contains
// information about progress and errors encountered during
// the process of doing apply, waiting for status and doing a prune.
type Event struct {
// Type is the type of event.
Type Type
// ErrorEvent contains information about any errors encountered.
ErrorEvent ErrorEvent
// ApplyEvent contains information about progress pertaining to
// applying a resource to the cluster.
ApplyEvent ApplyEvent
// StatusEvents contains information about the status of one of
// the applied resources.
StatusEvent wait.Event
// PruneEvent contains information about objects that have been
// pruned.
PruneEvent PruneEvent
}
type ErrorEvent struct {
Err error
}
type ApplyEvent struct {
Operation string
Object runtime.Object
}
type PruneEvent struct {
Object runtime.Object
}

View File

@ -8,6 +8,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/cli-runtime/pkg/printers"
"sigs.k8s.io/cli-utils/pkg/apply/event"
)
// KubectlPrinterAdapter is a workaround for capturing progress from
@ -16,22 +17,22 @@ import (
// plugs into ApplyOptions as a ToPrinter function, but instead of
// printing the info, it emits it as an event on the provided channel.
type KubectlPrinterAdapter struct {
ch chan<- Event
ch chan<- event.Event
}
// resourcePrinterImpl implements the ResourcePrinter interface. But
// instead of printing, it emits information on the provided channel.
type resourcePrinterImpl struct {
operation string
ch chan<- Event
ch chan<- event.Event
}
// PrintObj takes the provided object and operation and emits
// it on the channel.
func (r *resourcePrinterImpl) PrintObj(obj runtime.Object, _ io.Writer) error {
r.ch <- Event{
EventType: ApplyEventType,
ApplyEvent: ApplyEvent{
r.ch <- event.Event{
Type: event.ApplyEventType,
ApplyEvent: event.ApplyEvent{
Operation: r.operation,
Object: obj,
},

View File

@ -10,10 +10,11 @@ import (
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/cli-utils/pkg/apply/event"
)
func TestKubectlPrinterAdapter(t *testing.T) {
ch := make(chan Event)
ch := make(chan event.Event)
buffer := bytes.Buffer{}
operation := "operation"

View File

@ -13,16 +13,15 @@ package prune
import (
"fmt"
"io"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/printers"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/dynamic"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/validation"
"sigs.k8s.io/cli-utils/pkg/apply/event"
)
// PruneOptions encapsulates the necessary information to
@ -44,9 +43,6 @@ type PruneOptions struct {
pastGroupingObjects []*resource.Info
retrievedGroupingObjects bool
ToPrinter func(string) (printers.ResourcePrinter, error)
out io.Writer
DryRun bool
validator validation.Schema
@ -203,7 +199,7 @@ func (po *PruneOptions) calcPruneSet(pastGroupingInfos []*resource.Info) (*Inven
// (retrieved from previous grouping objects) but omitted in
// the current apply. Prune also delete all previous grouping
// objects. Returns an error if there was a problem.
func (po *PruneOptions) Prune(currentObjects []*resource.Info) error {
func (po *PruneOptions) Prune(currentObjects []*resource.Info, eventChannel chan<- event.Event) error {
currentGroupingObject, found := FindGroupingObject(currentObjects)
if !found {
return fmt.Errorf("current grouping object not found during prune")
@ -246,12 +242,11 @@ func (po *PruneOptions) Prune(currentObjects []*resource.Info) error {
return err
}
}
printer, err := po.ToPrinter("deleted")
if err != nil {
return err
}
if err = printer.PrintObj(obj, po.out); err != nil {
return err
eventChannel <- event.Event{
Type: event.PruneEventType,
PruneEvent: event.PruneEvent{
Object: obj,
},
}
}
// Delete previous grouping objects.
@ -264,12 +259,11 @@ func (po *PruneOptions) Prune(currentObjects []*resource.Info) error {
return err
}
}
printer, err := po.ToPrinter("deleted")
if err != nil {
return err
}
if err = printer.PrintObj(pastGroupInfo.Object, po.out); err != nil {
return err
eventChannel <- event.Event{
Type: event.PruneEventType,
PruneEvent: event.PruneEvent{
Object: pastGroupInfo.Object,
},
}
}
return nil