Merge pull request #1638 from lonelyCZ/pr-get-watch
Add watch function for karmadactl get
This commit is contained in:
commit
de7b7a15ce
|
@ -21,12 +21,16 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||
"k8s.io/cli-runtime/pkg/printers"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
"k8s.io/client-go/rest"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubectl/pkg/cmd/get"
|
||||
cmdutil "k8s.io/kubectl/pkg/cmd/util"
|
||||
"k8s.io/kubectl/pkg/util/interrupt"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
|
@ -51,6 +55,7 @@ var (
|
|||
{Name: "CLUSTER", Type: "string", Format: "", Priority: 0},
|
||||
{Name: "ADOPTION", Type: "string", Format: "", Priority: 0},
|
||||
}
|
||||
eventColumn = metav1.TableColumnDefinition{Name: "EVENT", Type: "string", Format: "", Priority: 0}
|
||||
|
||||
getShort = `Display one or many resources`
|
||||
)
|
||||
|
@ -69,6 +74,9 @@ func NewCmdGet(out io.Writer, karmadaConfig KarmadaConfig, parentCommand string)
|
|||
if err := o.Complete(cmd, args); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := o.Validate(cmd); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := o.Run(karmadaConfig, cmd, args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -84,6 +92,10 @@ func NewCmdGet(out io.Writer, karmadaConfig KarmadaConfig, parentCommand string)
|
|||
cmd.Flags().StringSliceVarP(&o.Clusters, "clusters", "C", []string{}, "-C=member1,member2")
|
||||
cmd.Flags().StringVar(&o.ClusterNamespace, "cluster-namespace", options.DefaultKarmadaClusterNamespace, "Namespace in the control plane where member cluster are stored.")
|
||||
cmd.Flags().BoolVarP(&o.AllNamespaces, "all-namespaces", "A", o.AllNamespaces, "If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace.")
|
||||
cmd.Flags().BoolVar(&o.IgnoreNotFound, "ignore-not-found", o.IgnoreNotFound, "If the requested object does not exist the command will return exit code 0.")
|
||||
cmd.Flags().BoolVarP(&o.Watch, "watch", "w", o.Watch, "After listing/getting the requested object, watch for changes. Uninitialized objects are excluded if no object name is provided.")
|
||||
cmd.Flags().BoolVar(&o.WatchOnly, "watch-only", o.WatchOnly, "Watch for changes to the requested object(s), without listing/getting first.")
|
||||
cmd.Flags().BoolVar(&o.OutputWatchEvents, "output-watch-events", o.OutputWatchEvents, "Output watch event objects when --watch or --watch-only is used. Existing objects are output as initial ADDED events.")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
@ -106,7 +118,8 @@ type CommandGetOptions struct {
|
|||
|
||||
resource.FilenameOptions
|
||||
|
||||
Raw string
|
||||
Watch bool
|
||||
WatchOnly bool
|
||||
ChunkSize int64
|
||||
|
||||
OutputWatchEvents bool
|
||||
|
@ -142,11 +155,6 @@ func NewCommandGetOptions(parent string, streams genericclioptions.IOStreams) *C
|
|||
func (g *CommandGetOptions) Complete(cmd *cobra.Command, args []string) error {
|
||||
newScheme := gclient.NewSchema()
|
||||
|
||||
outputOption := cmd.Flags().Lookup("output").Value.String()
|
||||
if strings.Contains(outputOption, "custom-columns") || outputOption == "yaml" || strings.Contains(outputOption, "json") {
|
||||
g.ServerPrint = false
|
||||
}
|
||||
|
||||
templateArg := ""
|
||||
if g.PrintFlags.TemplateFlags != nil && g.PrintFlags.TemplateFlags.TemplateArgument != nil {
|
||||
templateArg = *g.PrintFlags.TemplateFlags.TemplateArgument
|
||||
|
@ -181,6 +189,9 @@ func (g *CommandGetOptions) Complete(cmd *cobra.Command, args []string) error {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if outputObjects != nil {
|
||||
printer = &skipPrinter{delegate: printer, output: outputObjects}
|
||||
}
|
||||
if g.ServerPrint {
|
||||
printer = &get.TablePrinter{Delegate: printer}
|
||||
}
|
||||
|
@ -190,12 +201,32 @@ func (g *CommandGetOptions) Complete(cmd *cobra.Command, args []string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Validate checks the set of flags provided by the user.
|
||||
func (g *CommandGetOptions) Validate(cmd *cobra.Command) error {
|
||||
if cmdutil.GetFlagBool(cmd, "show-labels") {
|
||||
outputOption := cmd.Flags().Lookup("output").Value.String()
|
||||
if outputOption != "" && outputOption != "wide" {
|
||||
return fmt.Errorf("--show-labels option cannot be used with %s printer", outputOption)
|
||||
}
|
||||
}
|
||||
if g.OutputWatchEvents && !(g.Watch || g.WatchOnly) {
|
||||
return fmt.Errorf("--output-watch-events option can only be used with --watch or --watch-only")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Obj cluster info
|
||||
type Obj struct {
|
||||
Cluster string
|
||||
Info *resource.Info
|
||||
}
|
||||
|
||||
// WatchObj is a obj that is watched
|
||||
type WatchObj struct {
|
||||
Cluster string
|
||||
r *resource.Result
|
||||
}
|
||||
|
||||
// RBInfo resourcebinding info and print info
|
||||
var RBInfo map[string]*OtherPrint
|
||||
|
||||
|
@ -210,15 +241,21 @@ func (g *CommandGetOptions) Run(karmadaConfig KarmadaConfig, cmd *cobra.Command,
|
|||
var wg sync.WaitGroup
|
||||
|
||||
var objs []Obj
|
||||
var watchObjs []WatchObj
|
||||
var allErrs []error
|
||||
|
||||
clusterInfos := make(map[string]*ClusterInfo)
|
||||
RBInfo = make(map[string]*OtherPrint)
|
||||
|
||||
if g.AllNamespaces {
|
||||
g.ExplicitNamespace = false
|
||||
}
|
||||
|
||||
outputOption := cmd.Flags().Lookup("output").Value.String()
|
||||
if strings.Contains(outputOption, "custom-columns") || outputOption == "yaml" || strings.Contains(outputOption, "json") {
|
||||
g.ServerPrint = false
|
||||
}
|
||||
|
||||
clusterInfos := make(map[string]*ClusterInfo)
|
||||
RBInfo = make(map[string]*OtherPrint)
|
||||
|
||||
karmadaRestConfig, err := clusterInfoInit(g, karmadaConfig, clusterInfos)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -228,10 +265,14 @@ func (g *CommandGetOptions) Run(karmadaConfig KarmadaConfig, cmd *cobra.Command,
|
|||
for idx := range g.Clusters {
|
||||
g.setClusterProxyInfo(karmadaRestConfig, g.Clusters[idx], clusterInfos)
|
||||
f := getFactory(g.Clusters[idx], clusterInfos)
|
||||
go g.getObjInfo(&wg, &mux, f, g.Clusters[idx], &objs, &allErrs, args)
|
||||
go g.getObjInfo(&wg, &mux, f, g.Clusters[idx], &objs, &watchObjs, &allErrs, args)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if g.Watch || g.WatchOnly {
|
||||
return g.watch(watchObjs)
|
||||
}
|
||||
|
||||
if !g.IsHumanReadablePrinter {
|
||||
// have printed objects in yaml or json format above
|
||||
return nil
|
||||
|
@ -306,7 +347,7 @@ func (g *CommandGetOptions) printObjs(objs []Obj, allErrs *[]error, args []strin
|
|||
table.Rows = allTableRows
|
||||
|
||||
setNoAdoption(mapping)
|
||||
setColumnDefinition(table)
|
||||
g.setColumnDefinition(table)
|
||||
|
||||
printObj, err := helper.ToUnstructured(table)
|
||||
if err != nil {
|
||||
|
@ -349,7 +390,7 @@ func (g *CommandGetOptions) checkPrintWithNamespace(mapping *meta.RESTMapping) b
|
|||
|
||||
// getObjInfo get obj info in member cluster
|
||||
func (g *CommandGetOptions) getObjInfo(wg *sync.WaitGroup, mux *sync.Mutex, f cmdutil.Factory,
|
||||
cluster string, objs *[]Obj, allErrs *[]error, args []string) {
|
||||
cluster string, objs *[]Obj, watchObjs *[]WatchObj, allErrs *[]error, args []string) {
|
||||
defer wg.Done()
|
||||
|
||||
restClient, err := f.RESTClient()
|
||||
|
@ -365,14 +406,13 @@ func (g *CommandGetOptions) getObjInfo(wg *sync.WaitGroup, mux *sync.Mutex, f cm
|
|||
return
|
||||
}
|
||||
|
||||
chunkSize := g.ChunkSize
|
||||
r := f.NewBuilder().
|
||||
Unstructured().
|
||||
NamespaceParam(g.Namespace).DefaultNamespace().AllNamespaces(g.AllNamespaces).
|
||||
FilenameParam(g.ExplicitNamespace, &g.FilenameOptions).
|
||||
LabelSelectorParam(g.LabelSelector).
|
||||
FieldSelectorParam(g.FieldSelector).
|
||||
RequestChunksOf(chunkSize).
|
||||
RequestChunksOf(g.ChunkSize).
|
||||
ResourceTypeOrNameArgs(true, args...).
|
||||
ContinueOnError().
|
||||
Latest().
|
||||
|
@ -389,6 +429,17 @@ func (g *CommandGetOptions) getObjInfo(wg *sync.WaitGroup, mux *sync.Mutex, f cm
|
|||
return
|
||||
}
|
||||
|
||||
if g.Watch || g.WatchOnly {
|
||||
mux.Lock()
|
||||
watchObjsInfo := WatchObj{
|
||||
Cluster: cluster,
|
||||
r: r,
|
||||
}
|
||||
*watchObjs = append(*watchObjs, watchObjsInfo)
|
||||
mux.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if !g.IsHumanReadablePrinter {
|
||||
if err := g.printGeneric(r); err != nil {
|
||||
*allErrs = append(*allErrs, fmt.Errorf("cluster(%s): %s", cluster, err))
|
||||
|
@ -444,6 +495,200 @@ func (g *CommandGetOptions) reconstructionRow(objs []Obj, table *metav1.Table) (
|
|||
return allTableRows, mapping, nil
|
||||
}
|
||||
|
||||
// reconstructObj reconstruct runtime.object row
|
||||
func (g *CommandGetOptions) reconstructObj(obj runtime.Object, mapping *meta.RESTMapping, cluster string, event string) (*metav1.Table, error) {
|
||||
table := &metav1.Table{}
|
||||
var allTableRows []metav1.TableRow
|
||||
|
||||
unstr, ok := obj.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("attempt to decode non-Unstructured object")
|
||||
}
|
||||
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstr.Object, table); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for rowIdx := range table.Rows {
|
||||
var tempRow metav1.TableRow
|
||||
rbKey := getRBKey(mapping.GroupVersionKind, table.Rows[rowIdx], cluster)
|
||||
|
||||
if g.OutputWatchEvents {
|
||||
tempRow.Cells = append(append(tempRow.Cells, event, table.Rows[rowIdx].Cells[0], cluster), table.Rows[rowIdx].Cells[1:]...)
|
||||
} else {
|
||||
tempRow.Cells = append(append(tempRow.Cells, table.Rows[rowIdx].Cells[0], cluster), table.Rows[rowIdx].Cells[1:]...)
|
||||
}
|
||||
if _, ok := RBInfo[rbKey]; ok {
|
||||
tempRow.Cells = append(tempRow.Cells, "Y")
|
||||
} else {
|
||||
tempRow.Cells = append(tempRow.Cells, "N")
|
||||
}
|
||||
table.Rows[rowIdx].Cells = tempRow.Cells
|
||||
}
|
||||
allTableRows = append(allTableRows, table.Rows...)
|
||||
|
||||
table.Rows = allTableRows
|
||||
|
||||
setNoAdoption(mapping)
|
||||
g.setColumnDefinition(table)
|
||||
|
||||
return table, nil
|
||||
}
|
||||
|
||||
// watch starts a client-side watch of one or more resources.
|
||||
func (g *CommandGetOptions) watch(watchObjs []WatchObj) error {
|
||||
if len(watchObjs) <= 0 {
|
||||
return fmt.Errorf("not to find obj that is watched")
|
||||
}
|
||||
infos, err := watchObjs[0].r.Infos()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var objs []Obj
|
||||
for ix := range infos {
|
||||
objs = append(objs, Obj{Cluster: watchObjs[0].Cluster, Info: infos[ix]})
|
||||
}
|
||||
|
||||
if multipleGVKsRequested(objs) {
|
||||
return fmt.Errorf("watch is only supported on individual resources and resource collections - more than 1 resource was found")
|
||||
}
|
||||
|
||||
info := infos[0]
|
||||
mapping := info.ResourceMapping()
|
||||
outputObjects := utilpointer.BoolPtr(!g.WatchOnly)
|
||||
|
||||
printer, err := g.ToPrinter(mapping, outputObjects, g.AllNamespaces, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
writer := printers.GetNewTabWriter(g.Out)
|
||||
|
||||
// print the current object
|
||||
for idx := range watchObjs {
|
||||
var objsToPrint []runtime.Object
|
||||
obj, err := watchObjs[idx].r.Object()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
isList := meta.IsListType(obj)
|
||||
|
||||
if isList {
|
||||
tmpObj, _ := meta.ExtractList(obj)
|
||||
objsToPrint = append(objsToPrint, tmpObj...)
|
||||
} else {
|
||||
objsToPrint = append(objsToPrint, obj)
|
||||
}
|
||||
|
||||
for _, objToPrint := range objsToPrint {
|
||||
objrow, err := g.reconstructObj(objToPrint, mapping, watchObjs[idx].Cluster, string(watch.Added))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if idx > 0 {
|
||||
// only print ColumnDefinitions once
|
||||
objrow.ColumnDefinitions = nil
|
||||
}
|
||||
|
||||
printObj, err := helper.ToUnstructured(objrow)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := printer.PrintObj(printObj, writer); err != nil {
|
||||
return fmt.Errorf("unable to output the provided object: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
writer.Flush()
|
||||
|
||||
g.watchMultiClusterObj(watchObjs, mapping, outputObjects, printer)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//watchMultiClusterObj watch objects in multi clusters by goroutines
|
||||
func (g *CommandGetOptions) watchMultiClusterObj(watchObjs []WatchObj, mapping *meta.RESTMapping, outputObjects *bool, printer printers.ResourcePrinterFunc) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
writer := printers.GetNewTabWriter(g.Out)
|
||||
|
||||
wg.Add(len(watchObjs))
|
||||
for _, watchObj := range watchObjs {
|
||||
go func(watchObj WatchObj) {
|
||||
obj, err := watchObj.r.Object()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
rv := "0"
|
||||
isList := meta.IsListType(obj)
|
||||
if isList {
|
||||
// the resourceVersion of list objects is ~now but won't return
|
||||
// an initial watch event
|
||||
rv, err = meta.NewAccessor().ResourceVersion(obj)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
if isList {
|
||||
// we can start outputting objects now, watches started from lists don't emit synthetic added events
|
||||
*outputObjects = true
|
||||
} else {
|
||||
// suppress output, since watches started for individual items emit a synthetic ADDED event first
|
||||
*outputObjects = false
|
||||
}
|
||||
|
||||
if isList {
|
||||
// we can start outputting objects now, watches started from lists don't emit synthetic added events
|
||||
*outputObjects = true
|
||||
} else {
|
||||
// suppress output, since watches started for individual items emit a synthetic ADDED event first
|
||||
*outputObjects = false
|
||||
}
|
||||
|
||||
// print watched changes
|
||||
w, err := watchObj.r.Watch(rv)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
intr := interrupt.New(nil, cancel)
|
||||
_ = intr.Run(func() error {
|
||||
_, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
|
||||
objToPrint := e.Object
|
||||
|
||||
objrow, err := g.reconstructObj(objToPrint, mapping, watchObj.Cluster, string(e.Type))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
// not need to print ColumnDefinitions
|
||||
objrow.ColumnDefinitions = nil
|
||||
|
||||
printObj, err := helper.ToUnstructured(objrow)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if err := printer.PrintObj(printObj, writer); err != nil {
|
||||
return false, err
|
||||
}
|
||||
writer.Flush()
|
||||
// after processing at least one event, start outputting objects
|
||||
*outputObjects = true
|
||||
return false, nil
|
||||
})
|
||||
return err
|
||||
})
|
||||
}(watchObj)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (g *CommandGetOptions) printGeneric(r *resource.Result) error {
|
||||
// we flattened the data from the builder, so we have individual items, but now we'd like to either:
|
||||
// 1. if there is more than one item, combine them all into a single list
|
||||
|
@ -762,10 +1007,14 @@ func setNoAdoption(mapping *meta.RESTMapping) {
|
|||
}
|
||||
|
||||
// setColumnDefinition set print ColumnDefinition
|
||||
func setColumnDefinition(table *metav1.Table) {
|
||||
func (g *CommandGetOptions) setColumnDefinition(table *metav1.Table) {
|
||||
var tempColumnDefinition []metav1.TableColumnDefinition
|
||||
if len(table.ColumnDefinitions) > 0 {
|
||||
tempColumnDefinition = append(append(append(tempColumnDefinition, table.ColumnDefinitions[0], podColumns[0]), table.ColumnDefinitions[1:]...), podColumns[1:]...)
|
||||
if g.OutputWatchEvents {
|
||||
tempColumnDefinition = append(append(append(tempColumnDefinition, eventColumn, table.ColumnDefinitions[0], podColumns[0]), table.ColumnDefinitions[1:]...), podColumns[1:]...)
|
||||
} else {
|
||||
tempColumnDefinition = append(append(append(tempColumnDefinition, table.ColumnDefinitions[0], podColumns[0]), table.ColumnDefinitions[1:]...), podColumns[1:]...)
|
||||
}
|
||||
table.ColumnDefinitions = tempColumnDefinition
|
||||
}
|
||||
}
|
||||
|
@ -806,3 +1055,26 @@ func getExample(parentCommand string) string {
|
|||
fmt.Sprintf("%s get rs/nginx-cb87b6d88 service/kubernetes", parentCommand)
|
||||
return example
|
||||
}
|
||||
|
||||
// skipPrinter allows conditionally suppressing object output via the output field.
|
||||
// table objects are suppressed by setting their Rows to nil (allowing column definitions to propagate to the delegate).
|
||||
// non-table objects are suppressed by not calling the delegate at all.
|
||||
type skipPrinter struct {
|
||||
delegate printers.ResourcePrinter
|
||||
output *bool
|
||||
}
|
||||
|
||||
func (p *skipPrinter) PrintObj(obj runtime.Object, writer io.Writer) error {
|
||||
if *p.output {
|
||||
return p.delegate.PrintObj(obj, writer)
|
||||
}
|
||||
|
||||
table, isTable := obj.(*metav1.Table)
|
||||
if !isTable {
|
||||
return nil
|
||||
}
|
||||
|
||||
table = table.DeepCopy()
|
||||
table.Rows = nil
|
||||
return p.delegate.PrintObj(table, writer)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue