diff --git a/pkg/karmadactl/get/get.go b/pkg/karmadactl/get/get.go index bd13f99ae..71cadc4dd 100644 --- a/pkg/karmadactl/get/get.go +++ b/pkg/karmadactl/get/get.go @@ -254,20 +254,7 @@ func (g *CommandGetOptions) Validate(cmd *cobra.Command) error { if err != nil { return err } - clusterSet := sets.NewString() - for _, cluster := range clusters.Items { - clusterSet.Insert(cluster.Name) - } - - var noneExistClusters []string - for _, cluster := range g.Clusters { - if !clusterSet.Has(cluster) { - noneExistClusters = append(noneExistClusters, cluster) - } - } - if len(noneExistClusters) != 0 { - return fmt.Errorf("clusters don't exist: " + strings.Join(noneExistClusters, ",")) - } + return util.VerifyClustersExist(g.Clusters, clusters) } return nil } diff --git a/pkg/karmadactl/top/top.go b/pkg/karmadactl/top/top.go index 2d5294627..5267f857e 100644 --- a/pkg/karmadactl/top/top.go +++ b/pkg/karmadactl/top/top.go @@ -1,12 +1,20 @@ package top import ( + "context" + "fmt" + "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/kubernetes" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/util/templates" metricsapi "k8s.io/metrics/pkg/apis/metrics" + metricsclientset "k8s.io/metrics/pkg/client/clientset/versioned" + + karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" + "github.com/karmada-io/karmada/pkg/karmadactl/util" ) const ( @@ -27,7 +35,7 @@ var ( Metrics Server to be correctly configured and working on the member clusters.`) ) -func NewCmdTop(f cmdutil.Factory, parentCommand string, streams genericclioptions.IOStreams) *cobra.Command { +func NewCmdTop(f util.Factory, parentCommand string, streams genericclioptions.IOStreams) *cobra.Command { cmd := &cobra.Command{ Use: "top", Short: "Display resource (CPU/memory) usage of member clusters", @@ -56,3 +64,54 @@ func SupportedMetricsAPIVersionAvailable(discoveredAPIGroups *metav1.APIGroupLis } return false } + +func GenClusterList(clientSet karmadaclientset.Interface, clusters []string) ([]string, error) { + if len(clusters) != 0 { + return clusters, nil + } + + clusterList, err := clientSet.ClusterV1alpha1().Clusters().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to list all member clusters in control plane, err: %w", err) + } + + for i := range clusterList.Items { + clusters = append(clusters, clusterList.Items[i].Name) + } + return clusters, nil +} + +func GetMemberAndMetricsClientSet(f util.Factory, + cluster string, useProtocolBuffers bool) (*kubernetes.Clientset, *metricsclientset.Clientset, error) { + memberFactory, err := f.FactoryForMemberCluster(cluster) + if err != nil { + return nil, nil, err + } + clientset, err := memberFactory.KubernetesClientSet() + if err != nil { + return nil, nil, err + } + discoveryClient := clientset.DiscoveryClient + apiGroups, err := discoveryClient.ServerGroups() + if err != nil { + return nil, nil, err + } + metricsAPIAvailable := SupportedMetricsAPIVersionAvailable(apiGroups) + if !metricsAPIAvailable { + return nil, nil, fmt.Errorf("Metrics API not available") + } + + config, err := memberFactory.ToRESTConfig() + if err != nil { + return nil, nil, err + } + if useProtocolBuffers { + config.ContentType = "application/vnd.kubernetes.protobuf" + } + metricsClient, err := metricsclientset.NewForConfig(config) + if err != nil { + return nil, nil, err + } + + return clientset, metricsClient, nil +} diff --git a/pkg/karmadactl/top/top_pods.go b/pkg/karmadactl/top/top_pods.go index b0c42e5a0..0888ff827 100644 --- a/pkg/karmadactl/top/top_pods.go +++ b/pkg/karmadactl/top/top_pods.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/spf13/cobra" @@ -11,9 +12,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/cli-runtime/pkg/genericclioptions" - "k8s.io/client-go/discovery" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/util/completion" @@ -23,12 +24,16 @@ import ( metricsv1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1" metricsclientset "k8s.io/metrics/pkg/client/clientset/versioned" + autoscalingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/autoscaling/v1alpha1" + karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" "github.com/karmada-io/karmada/pkg/karmadactl/options" + "github.com/karmada-io/karmada/pkg/karmadactl/util" ) type TopPodOptions struct { ResourceName string Namespace string + Clusters []string LabelSelector string FieldSelector string SortBy string @@ -37,11 +42,12 @@ type TopPodOptions struct { NoHeaders bool UseProtocolBuffers bool Sum bool + lock sync.Mutex + errs []error - PodClient corev1client.PodsGetter - Printer *TopCmdPrinter - DiscoveryClient discovery.DiscoveryInterface - MetricsClient metricsclientset.Interface + Printer *TopCmdPrinter + metrics *metricsapi.PodMetricsList + karmadaClient karmadaclientset.Interface genericclioptions.IOStreams } @@ -71,7 +77,7 @@ var ( %[1]s top pod -l name=myLabel`)) ) -func NewCmdTopPod(f cmdutil.Factory, parentCommand string, o *TopPodOptions, streams genericclioptions.IOStreams) *cobra.Command { +func NewCmdTopPod(f util.Factory, parentCommand string, o *TopPodOptions, streams genericclioptions.IOStreams) *cobra.Command { if o == nil { o = &TopPodOptions{ IOStreams: streams, @@ -89,13 +95,14 @@ func NewCmdTopPod(f cmdutil.Factory, parentCommand string, o *TopPodOptions, str Run: func(cmd *cobra.Command, args []string) { cmdutil.CheckErr(o.Complete(f, cmd, args)) cmdutil.CheckErr(o.Validate()) - cmdutil.CheckErr(o.RunTopPod()) + cmdutil.CheckErr(o.RunTopPod(f)) }, Aliases: []string{"pods", "po"}, } cmdutil.AddLabelSelectorFlagVar(cmd, &o.LabelSelector) options.AddKubeConfigFlags(cmd.Flags()) cmd.Flags().StringVarP(options.DefaultConfigFlags.Namespace, "namespace", "n", *options.DefaultConfigFlags.Namespace, "If present, the namespace scope for this CLI request") + cmd.Flags().StringSliceVarP(&o.Clusters, "clusters", "C", []string{}, "-C=member1,member2") cmd.Flags().StringVar(&o.FieldSelector, "field-selector", o.FieldSelector, "Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The server only supports a limited number of field queries per type.") cmd.Flags().StringVar(&o.SortBy, "sort-by", o.SortBy, "If non-empty, sort pods list using specified field. The field can be either 'cpu' or 'memory'.") cmd.Flags().BoolVar(&o.PrintContainers, "containers", o.PrintContainers, "If present, print usage of containers within a pod.") @@ -106,7 +113,7 @@ func NewCmdTopPod(f cmdutil.Factory, parentCommand string, o *TopPodOptions, str return cmd } -func (o *TopPodOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error { +func (o *TopPodOptions) Complete(f util.Factory, cmd *cobra.Command, args []string) error { var err error if len(args) == 1 { o.ResourceName = args[0] @@ -114,31 +121,23 @@ func (o *TopPodOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []s return cmdutil.UsageErrorf(cmd, "%s", cmd.Use) } + o.lock = sync.Mutex{} + o.errs = make([]error, 0) + o.Namespace, _, err = f.ToRawKubeConfigLoader().Namespace() if err != nil { return err } - clientset, err := f.KubernetesClientSet() - if err != nil { - return err - } - - o.DiscoveryClient = clientset.DiscoveryClient - config, err := f.ToRESTConfig() - if err != nil { - return err - } - if o.UseProtocolBuffers { - config.ContentType = "application/vnd.kubernetes.protobuf" - } - o.MetricsClient, err = metricsclientset.NewForConfig(config) - if err != nil { - return err - } - - o.PodClient = clientset.CoreV1() + o.metrics = &metricsapi.PodMetricsList{} o.Printer = NewTopCmdPrinter(o.Out) + + karmadaClient, err := f.KarmadaClientSet() + if err != nil { + return err + } + o.karmadaClient = karmadaClient + return nil } @@ -151,11 +150,20 @@ func (o *TopPodOptions) Validate() error { if len(o.ResourceName) > 0 && (len(o.LabelSelector) > 0 || len(o.FieldSelector) > 0) { return errors.New("only one of NAME or selector can be provided") } + if len(o.Clusters) > 0 { + clusters, err := o.karmadaClient.ClusterV1alpha1().Clusters().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return err + } + return util.VerifyClustersExist(o.Clusters, clusters) + } return nil } -func (o *TopPodOptions) RunTopPod() error { +func (o *TopPodOptions) RunTopPod(f util.Factory) error { var err error + var wg sync.WaitGroup + labelSelector := labels.Everything() if len(o.LabelSelector) > 0 { labelSelector, err = labels.Parse(o.LabelSelector) @@ -171,30 +179,22 @@ func (o *TopPodOptions) RunTopPod() error { } } - apiGroups, err := o.DiscoveryClient.ServerGroups() + o.Clusters, err = GenClusterList(o.karmadaClient, o.Clusters) if err != nil { return err } - metricsAPIAvailable := SupportedMetricsAPIVersionAvailable(apiGroups) - - if !metricsAPIAvailable { - return errors.New("Metrics API not available") - } - metrics, err := getMetricsFromMetricsAPI(o.MetricsClient, o.Namespace, o.ResourceName, o.AllNamespaces, labelSelector, fieldSelector) - if err != nil { - return err + wg.Add(len(o.Clusters)) + for _, cluster := range o.Clusters { + go func(f util.Factory, + cluster string, labelSelector labels.Selector, fieldSelector fields.Selector) { + defer wg.Done() + o.runTopPodPerCluster(f, cluster, labelSelector, fieldSelector) + }(f, cluster, labelSelector, fieldSelector) } + wg.Wait() - // First we check why no metrics have been received. - if len(metrics.Items) == 0 { - // If the API server query is successful but all the pods are newly created, - // the metrics are probably not ready yet, so we return the error here in the first place. - err := verifyEmptyMetrics(o, labelSelector, fieldSelector) - if err != nil { - return err - } - + if len(o.metrics.Items) == 0 && len(o.errs) == 0 { // if we had no errors, be sure we output something. if o.AllNamespaces { fmt.Fprintln(o.ErrOut, "No resources found") @@ -203,17 +203,63 @@ func (o *TopPodOptions) RunTopPod() error { } } - return o.Printer.PrintPodMetrics(metrics.Items, o.PrintContainers, o.AllNamespaces, o.NoHeaders, o.SortBy, o.Sum) + err = o.Printer.PrintPodMetrics(o.metrics.Items, o.PrintContainers, o.AllNamespaces, o.NoHeaders, o.SortBy, o.Sum) + if err != nil { + o.errs = append(o.errs, err) + } + + return utilerrors.NewAggregate(o.errs) } -func getMetricsFromMetricsAPI(metricsClient metricsclientset.Interface, namespace, resourceName string, allNamespaces bool, labelSelector labels.Selector, fieldSelector fields.Selector) (*metricsapi.PodMetricsList, error) { +func (o *TopPodOptions) runTopPodPerCluster(f util.Factory, + cluster string, labelSelector labels.Selector, fieldSelector fields.Selector) { + var err error + defer func() { + if err != nil { + o.lock.Lock() + o.errs = append(o.errs, fmt.Errorf("cluster(%s): %s", cluster, err)) + o.lock.Unlock() + } + }() + + clusterClient, metricsClient, err := GetMemberAndMetricsClientSet(f, cluster, o.UseProtocolBuffers) + if err != nil { + return + } + metrics, err := getMetricsFromMetricsAPI(metricsClient, o.Namespace, o.ResourceName, o.AllNamespaces, labelSelector, fieldSelector) + if err != nil { + return + } + // First we check why no metrics have been received + if len(metrics.Items) == 0 { + // If the API server query is successful but all the pods are newly created, + // the metrics are probably not ready yet, so we return the error here in the first place. + err = verifyEmptyMetrics(o, clusterClient, labelSelector, fieldSelector) + if err != nil { + return + } + } + + o.lock.Lock() + for _, item := range metrics.Items { + if item.Annotations == nil { + item.Annotations = map[string]string{autoscalingv1alpha1.QuerySourceAnnotationKey: cluster} + } else { + item.Annotations[autoscalingv1alpha1.QuerySourceAnnotationKey] = cluster + } + o.metrics.Items = append(o.metrics.Items, item) + } + o.lock.Unlock() +} + +func getMetricsFromMetricsAPI(metricsClient *metricsclientset.Clientset, namespace, resourceName string, allNamespaces bool, labelSelector labels.Selector, fieldSelector fields.Selector) (*metricsapi.PodMetricsList, error) { var err error ns := metav1.NamespaceAll if !allNamespaces { ns = namespace } versionedMetrics := &metricsv1beta1api.PodMetricsList{} - if resourceName != "" { + if len(resourceName) != 0 { m, err := metricsClient.MetricsV1beta1().PodMetricses(ns).Get(context.TODO(), resourceName, metav1.GetOptions{}) if err != nil { return nil, err @@ -233,17 +279,17 @@ func getMetricsFromMetricsAPI(metricsClient metricsclientset.Interface, namespac return metrics, nil } -func verifyEmptyMetrics(o *TopPodOptions, labelSelector labels.Selector, fieldSelector fields.Selector) error { +func verifyEmptyMetrics(o *TopPodOptions, clusterClient *kubernetes.Clientset, labelSelector labels.Selector, fieldSelector fields.Selector) error { if len(o.ResourceName) > 0 { - pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.ResourceName, metav1.GetOptions{}) + pod, err := clusterClient.CoreV1().Pods(o.Namespace).Get(context.TODO(), o.ResourceName, metav1.GetOptions{}) if err != nil { return err } - if err := checkPodAge(pod); err != nil { + if err = checkPodAge(pod); err != nil { return err } } else { - pods, err := o.PodClient.Pods(o.Namespace).List(context.TODO(), metav1.ListOptions{ + pods, err := clusterClient.CoreV1().Pods(o.Namespace).List(context.TODO(), metav1.ListOptions{ LabelSelector: labelSelector.String(), FieldSelector: fieldSelector.String(), }) @@ -254,7 +300,7 @@ func verifyEmptyMetrics(o *TopPodOptions, labelSelector labels.Selector, fieldSe return nil } for i := range pods.Items { - if err := checkPodAge(&pods.Items[i]); err != nil { + if err = checkPodAge(&pods.Items[i]); err != nil { return err } } diff --git a/pkg/karmadactl/util/validate.go b/pkg/karmadactl/util/validate.go new file mode 100644 index 000000000..08baa6dea --- /dev/null +++ b/pkg/karmadactl/util/validate.go @@ -0,0 +1,29 @@ +package util + +import ( + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/util/sets" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" +) + +func VerifyClustersExist(input []string, clusters *clusterv1alpha1.ClusterList) error { + clusterSet := sets.NewString() + for _, cluster := range clusters.Items { + clusterSet.Insert(cluster.Name) + } + + var nonExistClusters []string + for _, cluster := range input { + if !clusterSet.Has(cluster) { + nonExistClusters = append(nonExistClusters, cluster) + } + } + if len(nonExistClusters) != 0 { + return fmt.Errorf("clusters don't exist: " + strings.Join(nonExistClusters, ",")) + } + + return nil +} diff --git a/pkg/karmadactl/util/validate_test.go b/pkg/karmadactl/util/validate_test.go new file mode 100644 index 000000000..eb384a6fd --- /dev/null +++ b/pkg/karmadactl/util/validate_test.go @@ -0,0 +1,72 @@ +package util + +import ( + "fmt" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" +) + +func TestVerifyWhetherClustersExist(t *testing.T) { + clusters := clusterv1alpha1.ClusterList{Items: []clusterv1alpha1.Cluster{ + { + ObjectMeta: metav1.ObjectMeta{Name: "member1"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "member2"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "member3"}, + }, + }} + tests := []struct { + name string + input []string + clusters *clusterv1alpha1.ClusterList + wantErr error + }{ + { + name: "input is nil", + input: nil, + clusters: &clusters, + wantErr: nil, + }, + { + name: "not exist", + input: []string{"member1", "member4"}, + clusters: &clusters, + wantErr: fmt.Errorf("clusters don't exist: member4"), + }, + { + name: "exist", + input: []string{"member1"}, + clusters: &clusters, + wantErr: nil, + }, + { + name: "clusterList is empty", + input: []string{"member1", "member2"}, + clusters: &clusterv1alpha1.ClusterList{Items: make([]clusterv1alpha1.Cluster, 0)}, + wantErr: fmt.Errorf("clusters don't exist: member1,member2"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if isExist := VerifyClustersExist(tt.input, tt.clusters); !isErrorEqual(tt.wantErr, isExist) { + t.Errorf("VerifyClustersExist want: %v, actually: %v", tt.wantErr, isExist) + } + }) + } +} + +func isErrorEqual(want error, actual error) bool { + if want == nil && actual == nil { + return true + } + if want != nil && actual != nil && want.Error() == actual.Error() { + return true + } + return false +}