diff --git a/pkg/karmadactl/get.go b/pkg/karmadactl/get.go index f1ea651d6..dc426157f 100644 --- a/pkg/karmadactl/get.go +++ b/pkg/karmadactl/get.go @@ -21,12 +21,16 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/printers" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/rest" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubectl/pkg/cmd/get" cmdutil "k8s.io/kubectl/pkg/cmd/util" + "k8s.io/kubectl/pkg/util/interrupt" + utilpointer "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" @@ -51,6 +55,7 @@ var ( {Name: "CLUSTER", Type: "string", Format: "", Priority: 0}, {Name: "ADOPTION", Type: "string", Format: "", Priority: 0}, } + eventColumn = metav1.TableColumnDefinition{Name: "EVENT", Type: "string", Format: "", Priority: 0} getShort = `Display one or many resources` ) @@ -69,6 +74,9 @@ func NewCmdGet(out io.Writer, karmadaConfig KarmadaConfig, parentCommand string) if err := o.Complete(cmd, args); err != nil { return err } + if err := o.Validate(cmd); err != nil { + return err + } if err := o.Run(karmadaConfig, cmd, args); err != nil { return err } @@ -84,6 +92,10 @@ func NewCmdGet(out io.Writer, karmadaConfig KarmadaConfig, parentCommand string) cmd.Flags().StringSliceVarP(&o.Clusters, "clusters", "C", []string{}, "-C=member1,member2") cmd.Flags().StringVar(&o.ClusterNamespace, "cluster-namespace", options.DefaultKarmadaClusterNamespace, "Namespace in the control plane where member cluster are stored.") cmd.Flags().BoolVarP(&o.AllNamespaces, "all-namespaces", "A", o.AllNamespaces, "If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace.") + cmd.Flags().BoolVar(&o.IgnoreNotFound, "ignore-not-found", o.IgnoreNotFound, "If the requested object does not exist the command will return exit code 0.") + cmd.Flags().BoolVarP(&o.Watch, "watch", "w", o.Watch, "After listing/getting the requested object, watch for changes. Uninitialized objects are excluded if no object name is provided.") + cmd.Flags().BoolVar(&o.WatchOnly, "watch-only", o.WatchOnly, "Watch for changes to the requested object(s), without listing/getting first.") + cmd.Flags().BoolVar(&o.OutputWatchEvents, "output-watch-events", o.OutputWatchEvents, "Output watch event objects when --watch or --watch-only is used. Existing objects are output as initial ADDED events.") return cmd } @@ -106,7 +118,8 @@ type CommandGetOptions struct { resource.FilenameOptions - Raw string + Watch bool + WatchOnly bool ChunkSize int64 OutputWatchEvents bool @@ -142,11 +155,6 @@ func NewCommandGetOptions(parent string, streams genericclioptions.IOStreams) *C func (g *CommandGetOptions) Complete(cmd *cobra.Command, args []string) error { newScheme := gclient.NewSchema() - outputOption := cmd.Flags().Lookup("output").Value.String() - if strings.Contains(outputOption, "custom-columns") || outputOption == "yaml" || strings.Contains(outputOption, "json") { - g.ServerPrint = false - } - templateArg := "" if g.PrintFlags.TemplateFlags != nil && g.PrintFlags.TemplateFlags.TemplateArgument != nil { templateArg = *g.PrintFlags.TemplateFlags.TemplateArgument @@ -181,6 +189,9 @@ func (g *CommandGetOptions) Complete(cmd *cobra.Command, args []string) error { return nil, err } + if outputObjects != nil { + printer = &skipPrinter{delegate: printer, output: outputObjects} + } if g.ServerPrint { printer = &get.TablePrinter{Delegate: printer} } @@ -190,12 +201,32 @@ func (g *CommandGetOptions) Complete(cmd *cobra.Command, args []string) error { return nil } +// Validate checks the set of flags provided by the user. +func (g *CommandGetOptions) Validate(cmd *cobra.Command) error { + if cmdutil.GetFlagBool(cmd, "show-labels") { + outputOption := cmd.Flags().Lookup("output").Value.String() + if outputOption != "" && outputOption != "wide" { + return fmt.Errorf("--show-labels option cannot be used with %s printer", outputOption) + } + } + if g.OutputWatchEvents && !(g.Watch || g.WatchOnly) { + return fmt.Errorf("--output-watch-events option can only be used with --watch or --watch-only") + } + return nil +} + // Obj cluster info type Obj struct { Cluster string Info *resource.Info } +// WatchObj is a obj that is watched +type WatchObj struct { + Cluster string + r *resource.Result +} + // RBInfo resourcebinding info and print info var RBInfo map[string]*OtherPrint @@ -210,15 +241,21 @@ func (g *CommandGetOptions) Run(karmadaConfig KarmadaConfig, cmd *cobra.Command, var wg sync.WaitGroup var objs []Obj + var watchObjs []WatchObj var allErrs []error - clusterInfos := make(map[string]*ClusterInfo) - RBInfo = make(map[string]*OtherPrint) - if g.AllNamespaces { g.ExplicitNamespace = false } + outputOption := cmd.Flags().Lookup("output").Value.String() + if strings.Contains(outputOption, "custom-columns") || outputOption == "yaml" || strings.Contains(outputOption, "json") { + g.ServerPrint = false + } + + clusterInfos := make(map[string]*ClusterInfo) + RBInfo = make(map[string]*OtherPrint) + karmadaRestConfig, err := clusterInfoInit(g, karmadaConfig, clusterInfos) if err != nil { return err @@ -228,10 +265,14 @@ func (g *CommandGetOptions) Run(karmadaConfig KarmadaConfig, cmd *cobra.Command, for idx := range g.Clusters { g.setClusterProxyInfo(karmadaRestConfig, g.Clusters[idx], clusterInfos) f := getFactory(g.Clusters[idx], clusterInfos) - go g.getObjInfo(&wg, &mux, f, g.Clusters[idx], &objs, &allErrs, args) + go g.getObjInfo(&wg, &mux, f, g.Clusters[idx], &objs, &watchObjs, &allErrs, args) } wg.Wait() + if g.Watch || g.WatchOnly { + return g.watch(watchObjs) + } + if !g.IsHumanReadablePrinter { // have printed objects in yaml or json format above return nil @@ -306,7 +347,7 @@ func (g *CommandGetOptions) printObjs(objs []Obj, allErrs *[]error, args []strin table.Rows = allTableRows setNoAdoption(mapping) - setColumnDefinition(table) + g.setColumnDefinition(table) printObj, err := helper.ToUnstructured(table) if err != nil { @@ -349,7 +390,7 @@ func (g *CommandGetOptions) checkPrintWithNamespace(mapping *meta.RESTMapping) b // getObjInfo get obj info in member cluster func (g *CommandGetOptions) getObjInfo(wg *sync.WaitGroup, mux *sync.Mutex, f cmdutil.Factory, - cluster string, objs *[]Obj, allErrs *[]error, args []string) { + cluster string, objs *[]Obj, watchObjs *[]WatchObj, allErrs *[]error, args []string) { defer wg.Done() restClient, err := f.RESTClient() @@ -365,14 +406,13 @@ func (g *CommandGetOptions) getObjInfo(wg *sync.WaitGroup, mux *sync.Mutex, f cm return } - chunkSize := g.ChunkSize r := f.NewBuilder(). Unstructured(). NamespaceParam(g.Namespace).DefaultNamespace().AllNamespaces(g.AllNamespaces). FilenameParam(g.ExplicitNamespace, &g.FilenameOptions). LabelSelectorParam(g.LabelSelector). FieldSelectorParam(g.FieldSelector). - RequestChunksOf(chunkSize). + RequestChunksOf(g.ChunkSize). ResourceTypeOrNameArgs(true, args...). ContinueOnError(). Latest(). @@ -389,6 +429,17 @@ func (g *CommandGetOptions) getObjInfo(wg *sync.WaitGroup, mux *sync.Mutex, f cm return } + if g.Watch || g.WatchOnly { + mux.Lock() + watchObjsInfo := WatchObj{ + Cluster: cluster, + r: r, + } + *watchObjs = append(*watchObjs, watchObjsInfo) + mux.Unlock() + return + } + if !g.IsHumanReadablePrinter { if err := g.printGeneric(r); err != nil { *allErrs = append(*allErrs, fmt.Errorf("cluster(%s): %s", cluster, err)) @@ -444,6 +495,200 @@ func (g *CommandGetOptions) reconstructionRow(objs []Obj, table *metav1.Table) ( return allTableRows, mapping, nil } +// reconstructObj reconstruct runtime.object row +func (g *CommandGetOptions) reconstructObj(obj runtime.Object, mapping *meta.RESTMapping, cluster string, event string) (*metav1.Table, error) { + table := &metav1.Table{} + var allTableRows []metav1.TableRow + + unstr, ok := obj.(*unstructured.Unstructured) + if !ok { + return nil, fmt.Errorf("attempt to decode non-Unstructured object") + } + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstr.Object, table); err != nil { + return nil, err + } + + for rowIdx := range table.Rows { + var tempRow metav1.TableRow + rbKey := getRBKey(mapping.GroupVersionKind, table.Rows[rowIdx], cluster) + + if g.OutputWatchEvents { + tempRow.Cells = append(append(tempRow.Cells, event, table.Rows[rowIdx].Cells[0], cluster), table.Rows[rowIdx].Cells[1:]...) + } else { + tempRow.Cells = append(append(tempRow.Cells, table.Rows[rowIdx].Cells[0], cluster), table.Rows[rowIdx].Cells[1:]...) + } + if _, ok := RBInfo[rbKey]; ok { + tempRow.Cells = append(tempRow.Cells, "Y") + } else { + tempRow.Cells = append(tempRow.Cells, "N") + } + table.Rows[rowIdx].Cells = tempRow.Cells + } + allTableRows = append(allTableRows, table.Rows...) + + table.Rows = allTableRows + + setNoAdoption(mapping) + g.setColumnDefinition(table) + + return table, nil +} + +// watch starts a client-side watch of one or more resources. +func (g *CommandGetOptions) watch(watchObjs []WatchObj) error { + if len(watchObjs) <= 0 { + return fmt.Errorf("not to find obj that is watched") + } + infos, err := watchObjs[0].r.Infos() + if err != nil { + return err + } + + var objs []Obj + for ix := range infos { + objs = append(objs, Obj{Cluster: watchObjs[0].Cluster, Info: infos[ix]}) + } + + if multipleGVKsRequested(objs) { + return fmt.Errorf("watch is only supported on individual resources and resource collections - more than 1 resource was found") + } + + info := infos[0] + mapping := info.ResourceMapping() + outputObjects := utilpointer.BoolPtr(!g.WatchOnly) + + printer, err := g.ToPrinter(mapping, outputObjects, g.AllNamespaces, false) + if err != nil { + return err + } + writer := printers.GetNewTabWriter(g.Out) + + // print the current object + for idx := range watchObjs { + var objsToPrint []runtime.Object + obj, err := watchObjs[idx].r.Object() + if err != nil { + return err + } + + isList := meta.IsListType(obj) + + if isList { + tmpObj, _ := meta.ExtractList(obj) + objsToPrint = append(objsToPrint, tmpObj...) + } else { + objsToPrint = append(objsToPrint, obj) + } + + for _, objToPrint := range objsToPrint { + objrow, err := g.reconstructObj(objToPrint, mapping, watchObjs[idx].Cluster, string(watch.Added)) + if err != nil { + return err + } + + if idx > 0 { + // only print ColumnDefinitions once + objrow.ColumnDefinitions = nil + } + + printObj, err := helper.ToUnstructured(objrow) + if err != nil { + return err + } + + if err := printer.PrintObj(printObj, writer); err != nil { + return fmt.Errorf("unable to output the provided object: %v", err) + } + } + } + writer.Flush() + + g.watchMultiClusterObj(watchObjs, mapping, outputObjects, printer) + + return nil +} + +//watchMultiClusterObj watch objects in multi clusters by goroutines +func (g *CommandGetOptions) watchMultiClusterObj(watchObjs []WatchObj, mapping *meta.RESTMapping, outputObjects *bool, printer printers.ResourcePrinterFunc) { + var wg sync.WaitGroup + + writer := printers.GetNewTabWriter(g.Out) + + wg.Add(len(watchObjs)) + for _, watchObj := range watchObjs { + go func(watchObj WatchObj) { + obj, err := watchObj.r.Object() + if err != nil { + panic(err) + } + + rv := "0" + isList := meta.IsListType(obj) + if isList { + // the resourceVersion of list objects is ~now but won't return + // an initial watch event + rv, err = meta.NewAccessor().ResourceVersion(obj) + if err != nil { + panic(err) + } + } + + if isList { + // we can start outputting objects now, watches started from lists don't emit synthetic added events + *outputObjects = true + } else { + // suppress output, since watches started for individual items emit a synthetic ADDED event first + *outputObjects = false + } + + if isList { + // we can start outputting objects now, watches started from lists don't emit synthetic added events + *outputObjects = true + } else { + // suppress output, since watches started for individual items emit a synthetic ADDED event first + *outputObjects = false + } + + // print watched changes + w, err := watchObj.r.Watch(rv) + if err != nil { + panic(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + intr := interrupt.New(nil, cancel) + _ = intr.Run(func() error { + _, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) { + objToPrint := e.Object + + objrow, err := g.reconstructObj(objToPrint, mapping, watchObj.Cluster, string(e.Type)) + if err != nil { + return false, err + } + // not need to print ColumnDefinitions + objrow.ColumnDefinitions = nil + + printObj, err := helper.ToUnstructured(objrow) + if err != nil { + return false, err + } + + if err := printer.PrintObj(printObj, writer); err != nil { + return false, err + } + writer.Flush() + // after processing at least one event, start outputting objects + *outputObjects = true + return false, nil + }) + return err + }) + }(watchObj) + } + wg.Wait() +} + func (g *CommandGetOptions) printGeneric(r *resource.Result) error { // we flattened the data from the builder, so we have individual items, but now we'd like to either: // 1. if there is more than one item, combine them all into a single list @@ -762,10 +1007,14 @@ func setNoAdoption(mapping *meta.RESTMapping) { } // setColumnDefinition set print ColumnDefinition -func setColumnDefinition(table *metav1.Table) { +func (g *CommandGetOptions) setColumnDefinition(table *metav1.Table) { var tempColumnDefinition []metav1.TableColumnDefinition if len(table.ColumnDefinitions) > 0 { - tempColumnDefinition = append(append(append(tempColumnDefinition, table.ColumnDefinitions[0], podColumns[0]), table.ColumnDefinitions[1:]...), podColumns[1:]...) + if g.OutputWatchEvents { + tempColumnDefinition = append(append(append(tempColumnDefinition, eventColumn, table.ColumnDefinitions[0], podColumns[0]), table.ColumnDefinitions[1:]...), podColumns[1:]...) + } else { + tempColumnDefinition = append(append(append(tempColumnDefinition, table.ColumnDefinitions[0], podColumns[0]), table.ColumnDefinitions[1:]...), podColumns[1:]...) + } table.ColumnDefinitions = tempColumnDefinition } } @@ -806,3 +1055,26 @@ func getExample(parentCommand string) string { fmt.Sprintf("%s get rs/nginx-cb87b6d88 service/kubernetes", parentCommand) return example } + +// skipPrinter allows conditionally suppressing object output via the output field. +// table objects are suppressed by setting their Rows to nil (allowing column definitions to propagate to the delegate). +// non-table objects are suppressed by not calling the delegate at all. +type skipPrinter struct { + delegate printers.ResourcePrinter + output *bool +} + +func (p *skipPrinter) PrintObj(obj runtime.Object, writer io.Writer) error { + if *p.output { + return p.delegate.PrintObj(obj, writer) + } + + table, isTable := obj.(*metav1.Table) + if !isTable { + return nil + } + + table = table.DeepCopy() + table.Rows = nil + return p.delegate.PrintObj(table, writer) +}