From 70e52603158b852537f041f779d6886a71f7d2be Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Tue, 6 Jun 2023 00:48:01 +0800 Subject: [PATCH] feat: support custom metrics for metrics adapte Signed-off-by: chaunceyjiang --- .../karmada-metrics-adapter-apiservice.yaml | 28 ++ cmd/metrics-adapter/app/options/options.go | 10 +- pkg/metricsadapter/adapter.go | 3 +- pkg/metricsadapter/controller.go | 36 +- pkg/metricsadapter/multiclient/client.go | 70 ++++ pkg/metricsadapter/provider/custommetrics.go | 332 +++++++++++++++++- 6 files changed, 457 insertions(+), 22 deletions(-) create mode 100644 pkg/metricsadapter/multiclient/client.go diff --git a/artifacts/deploy/karmada-metrics-adapter-apiservice.yaml b/artifacts/deploy/karmada-metrics-adapter-apiservice.yaml index a6826e2e3..c6c1184e4 100644 --- a/artifacts/deploy/karmada-metrics-adapter-apiservice.yaml +++ b/artifacts/deploy/karmada-metrics-adapter-apiservice.yaml @@ -15,6 +15,34 @@ spec: version: v1beta1 versionPriority: 10 --- +apiVersion: apiregistration.k8s.io/v1 +kind: APIService +metadata: + name: v1beta2.custom.metrics.k8s.io +spec: + service: + name: karmada-metrics-adapter + namespace: karmada-system + group: custom.metrics.k8s.io + version: v1beta2 + insecureSkipTLSVerify: true + groupPriorityMinimum: 100 + versionPriority: 200 +--- +apiVersion: apiregistration.k8s.io/v1 +kind: APIService +metadata: + name: v1beta1.custom.metrics.k8s.io +spec: + service: + name: karmada-metrics-adapter + namespace: karmada-system + group: custom.metrics.k8s.io + version: v1beta1 + insecureSkipTLSVerify: true + groupPriorityMinimum: 100 + versionPriority: 200 +--- apiVersion: v1 kind: Service metadata: diff --git a/cmd/metrics-adapter/app/options/options.go b/cmd/metrics-adapter/app/options/options.go index 987d3c305..2970cfe0a 100755 --- a/cmd/metrics-adapter/app/options/options.go +++ b/cmd/metrics-adapter/app/options/options.go @@ -6,6 +6,8 @@ import ( "github.com/spf13/pflag" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericapiserver "k8s.io/apiserver/pkg/server" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" "sigs.k8s.io/custom-metrics-apiserver/pkg/cmd/options" @@ -56,8 +58,9 @@ func (o *Options) Config() (*metricsadapter.MetricsServer, error) { karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig) factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0) - - metricsController := metricsadapter.NewMetricsController(restConfig, factory) + kubeClient := kubernetes.NewForConfigOrDie(restConfig) + kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0) + metricsController := metricsadapter.NewMetricsController(restConfig, factory, kubeFactory) metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions) metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme)) metricsAdapter.OpenAPIConfig.Info.Title = "karmada-metrics-adapter" @@ -70,6 +73,9 @@ func (o *Options) Config() (*metricsadapter.MetricsServer, error) { } err = server.GenericAPIServer.AddPostStartHook("start-karmada-informers", func(context genericapiserver.PostStartHookContext) error { + kubeFactory.Core().V1().Secrets().Informer() + kubeFactory.Start(context.StopCh) + kubeFactory.WaitForCacheSync(context.StopCh) factory.Start(context.StopCh) return nil }) diff --git a/pkg/metricsadapter/adapter.go b/pkg/metricsadapter/adapter.go index 3075d3182..c32f4c7e8 100755 --- a/pkg/metricsadapter/adapter.go +++ b/pkg/metricsadapter/adapter.go @@ -18,9 +18,8 @@ type MetricsAdapter struct { func NewMetricsAdapter(controller *MetricsController, customMetricsAdapterServerOptions *options.CustomMetricsAdapterServerOptions) *MetricsAdapter { adapter := &MetricsAdapter{} adapter.CustomMetricsAdapterServerOptions = customMetricsAdapterServerOptions - adapter.ResourceMetricsProvider = provider.NewResourceMetricsProvider(controller.ClusterLister, controller.InformerManager) - customProvider := provider.MakeCustomMetricsProvider() + customProvider := provider.MakeCustomMetricsProvider(controller.ClusterLister, controller.MultiClusterDiscovery) externalProvider := provider.MakeExternalMetricsProvider() adapter.WithCustomMetrics(customProvider) adapter.WithExternalMetrics(externalProvider) diff --git a/pkg/metricsadapter/controller.go b/pkg/metricsadapter/controller.go index b72591b88..8a3256b99 100755 --- a/pkg/metricsadapter/controller.go +++ b/pkg/metricsadapter/controller.go @@ -8,6 +8,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -16,6 +17,7 @@ import ( clusterV1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" + "github.com/karmada-io/karmada/pkg/metricsadapter/multiclient" "github.com/karmada-io/karmada/pkg/metricsadapter/provider" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" @@ -29,23 +31,24 @@ var ( // MetricsController is a controller for metrics, control the lifecycle of multi-clusters informer type MetricsController struct { - InformerFactory informerfactory.SharedInformerFactory - ClusterLister clusterlister.ClusterLister - InformerManager genericmanager.MultiClusterInformerManager - - queue workqueue.RateLimitingInterface - restConfig *rest.Config + InformerFactory informerfactory.SharedInformerFactory + ClusterLister clusterlister.ClusterLister + InformerManager genericmanager.MultiClusterInformerManager + MultiClusterDiscovery multiclient.MultiClusterDiscoveryInterface + queue workqueue.RateLimitingInterface + restConfig *rest.Config } // NewMetricsController creates a new metrics controller -func NewMetricsController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory) *MetricsController { +func NewMetricsController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, kubeFactory informers.SharedInformerFactory) *MetricsController { clusterLister := factory.Cluster().V1alpha1().Clusters().Lister() controller := &MetricsController{ - InformerFactory: factory, - ClusterLister: clusterLister, - InformerManager: genericmanager.GetInstance(), - restConfig: restConfig, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "metrics-adapter"), + InformerFactory: factory, + ClusterLister: clusterLister, + MultiClusterDiscovery: multiclient.NewMultiClusterDiscoveryClient(clusterLister, kubeFactory), + InformerManager: genericmanager.GetInstance(), + restConfig: restConfig, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "metrics-adapter"), } controller.addEventHandler() @@ -127,6 +130,7 @@ func (m *MetricsController) handleClusters() bool { if apierrors.IsNotFound(err) { klog.Infof("try to stop cluster informer %s", clusterName) m.InformerManager.Stop(clusterName) + m.MultiClusterDiscovery.Remove(clusterName) return true } return false @@ -135,12 +139,14 @@ func (m *MetricsController) handleClusters() bool { if !cls.DeletionTimestamp.IsZero() { klog.Infof("try to stop cluster informer %s", clusterName) m.InformerManager.Stop(clusterName) + m.MultiClusterDiscovery.Remove(clusterName) return true } if !util.IsClusterReady(&cls.Status) { klog.Warningf("cluster %s is notReady try to stop this cluster informer", clusterName) m.InformerManager.Stop(clusterName) + m.MultiClusterDiscovery.Remove(clusterName) return false } @@ -159,8 +165,12 @@ func (m *MetricsController) handleClusters() bool { } _ = m.InformerManager.ForCluster(clusterName, clusterDynamicClient.DynamicClientSet, 0) } + err = m.MultiClusterDiscovery.Set(clusterName) + if err != nil { + klog.Warningf("failed to build discoveryClient for cluster(%s), Error: %+v", clusterName, err) + return true + } sci := m.InformerManager.GetSingleClusterManager(clusterName) - // Just trigger the informer to work _ = sci.Lister(provider.PodsGVR) _ = sci.Lister(provider.NodesGVR) diff --git a/pkg/metricsadapter/multiclient/client.go b/pkg/metricsadapter/multiclient/client.go new file mode 100644 index 000000000..cfa300aaf --- /dev/null +++ b/pkg/metricsadapter/multiclient/client.go @@ -0,0 +1,70 @@ +package multiclient + +import ( + "sync" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/discovery" + "k8s.io/client-go/informers" + listcorev1 "k8s.io/client-go/listers/core/v1" + + clusterV1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" +) + +// MultiClusterDiscoveryInterface provides DiscoveryClient for multiple clusters. +type MultiClusterDiscoveryInterface interface { + Get(clusterName string) *discovery.DiscoveryClient + Set(clusterName string) error + Remove(clusterName string) +} + +// MultiClusterDiscovery provides DiscoveryClient for multiple clusters. +type MultiClusterDiscovery struct { + sync.RWMutex + clients map[string]*discovery.DiscoveryClient + secretLister listcorev1.SecretLister + clusterLister clusterlister.ClusterLister +} + +// NewMultiClusterDiscoveryClient returns a new MultiClusterDiscovery +func NewMultiClusterDiscoveryClient(clusterLister clusterlister.ClusterLister, KubeFactory informers.SharedInformerFactory) MultiClusterDiscoveryInterface { + return &MultiClusterDiscovery{ + clusterLister: clusterLister, + secretLister: KubeFactory.Core().V1().Secrets().Lister(), + clients: map[string]*discovery.DiscoveryClient{}, + } +} + +// Get returns a DiscoveryClient for the provided clusterName. +func (m *MultiClusterDiscovery) Get(clusterName string) *discovery.DiscoveryClient { + m.RLock() + defer m.RUnlock() + return m.clients[clusterName] +} + +// Set a DiscoveryClient for the provided clusterName. +func (m *MultiClusterDiscovery) Set(clusterName string) error { + clusterGetter := func(cluster string) (*clusterV1alpha1.Cluster, error) { + return m.clusterLister.Get(cluster) + } + secretGetter := func(namespace string, name string) (*corev1.Secret, error) { + return m.secretLister.Secrets(namespace).Get(name) + } + clusterConfig, err := util.BuildClusterConfig(clusterName, clusterGetter, secretGetter) + if err != nil { + return err + } + m.Lock() + defer m.Unlock() + m.clients[clusterName] = discovery.NewDiscoveryClientForConfigOrDie(clusterConfig) + return nil +} + +// Remove a DiscoveryClient for the provided clusterName. +func (m *MultiClusterDiscovery) Remove(clusterName string) { + m.Lock() + defer m.Unlock() + delete(m.clients, clusterName) +} diff --git a/pkg/metricsadapter/provider/custommetrics.go b/pkg/metricsadapter/provider/custommetrics.go index 7a1d273f1..6db2bd2e5 100755 --- a/pkg/metricsadapter/provider/custommetrics.go +++ b/pkg/metricsadapter/provider/custommetrics.go @@ -3,33 +3,355 @@ package provider import ( "context" "fmt" + "reflect" + "strings" + "sync" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/discovery" + "k8s.io/klog/v2" "k8s.io/metrics/pkg/apis/custom_metrics" + custommetricsv1beta2 "k8s.io/metrics/pkg/apis/custom_metrics/v1beta2" + custommetricsclient "k8s.io/metrics/pkg/client/custom_metrics" + custommetricsschema "k8s.io/metrics/pkg/client/custom_metrics/scheme" "sigs.k8s.io/custom-metrics-apiserver/pkg/provider" + + clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" + "github.com/karmada-io/karmada/pkg/metricsadapter/multiclient" +) + +var ( + versionConverter = custommetricsclient.NewMetricConverter() ) // CustomMetricsProvider is a custom metrics provider type CustomMetricsProvider struct { + // multiClusterDiscovery returns a discovery client for member cluster apiserver + multiClusterDiscovery multiclient.MultiClusterDiscoveryInterface + clusterLister clusterlister.ClusterLister } // MakeCustomMetricsProvider creates a new custom metrics provider -func MakeCustomMetricsProvider() *CustomMetricsProvider { - return &CustomMetricsProvider{} +func MakeCustomMetricsProvider(clusterLister clusterlister.ClusterLister, multiClusterDiscovery multiclient.MultiClusterDiscoveryInterface) *CustomMetricsProvider { + return &CustomMetricsProvider{ + clusterLister: clusterLister, + multiClusterDiscovery: multiClusterDiscovery, + } } // GetMetricByName will query metrics by name from member clusters and return the result func (c *CustomMetricsProvider) GetMetricByName(ctx context.Context, name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) { - return nil, fmt.Errorf("karmada-metrics-adapter still not implement it") + clusters, err := c.clusterLister.List(labels.Everything()) + if err != nil { + klog.Errorf("Failed to list clusters: %v", err) + return nil, err + } + metricValueList := &custom_metrics.MetricValueList{} + metricsChanel := make(chan *custom_metrics.MetricValueList) + wg := sync.WaitGroup{} + for _, cluster := range clusters { + wg.Add(1) + go func(clusterName string) { + defer wg.Done() + metrics, err := c.getMetricByName(ctx, clusterName, name, info, metricSelector) + if err != nil { + klog.Warningf("query %s's %s metric from cluster %s failed, err: %+v", info.GroupResource.String(), info.Metric, clusterName, err) + return + } + metricsChanel <- metrics + }(cluster.Name) + } + go func() { + wg.Wait() + close(metricsChanel) + }() + for { + metrics, ok := <-metricsChanel + if !ok { + break + } + metricValueList.Items = append(metricValueList.Items, metrics.Items...) + } + var metrics *custom_metrics.MetricValue + // TODO(chaunceyjiang) The MetricValue items need to be sorted. + for i := range metricValueList.Items { + if metrics == nil { + metrics = &metricValueList.Items[i] + continue + } + // metrics is unique in one cluster, but it may exist in multiple clusters. + // for this situation, we need to add the value of all clusters. + metrics.Value.Add(metricValueList.Items[i].Value) + } + if metrics == nil { + return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) + } + return metrics, nil } // GetMetricBySelector will query metrics by selector from member clusters and return the result func (c *CustomMetricsProvider) GetMetricBySelector(ctx context.Context, namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) { - return nil, fmt.Errorf("karmada-metrics-adapter still not implement it") + clusters, err := c.clusterLister.List(labels.Everything()) + if err != nil { + klog.Errorf("Failed to list clusters: %v", err) + return nil, err + } + metricValueList := &custom_metrics.MetricValueList{} + wg := sync.WaitGroup{} + metricsChanel := make(chan *custom_metrics.MetricValueList) + for _, cluster := range clusters { + wg.Add(1) + go func(clusterName string) { + defer wg.Done() + metrics, err := c.getMetricBySelector(ctx, clusterName, namespace, selector, info, metricSelector) + if err != nil { + klog.Warningf("query %s's %s metric from cluster %s failed", info.GroupResource.String(), info.Metric, clusterName) + return + } + metricsChanel <- metrics + }(cluster.Name) + } + go func() { + wg.Wait() + close(metricsChanel) + }() + sameMetrics := make(map[string]custom_metrics.MetricValue) + for { + metrics, ok := <-metricsChanel + if !ok { + break + } + // TODO(chaunceyjiang) The MetricValue items need to be sorted. + for _, metric := range metrics.Items { + // metrics is unique in one cluster, but it may exist in multiple clusters. + // for this situation, we need to add the value of all clusters. + if metricValue, same := sameMetrics[metric.DescribedObject.Name]; same { + metric.Value.Add(metricValue.Value) + } + sameMetrics[metric.DescribedObject.Name] = metric + } + } + for _, metric := range sameMetrics { + metricValueList.Items = append(metricValueList.Items, metric) + } + if len(metricValueList.Items) == 0 { + return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric) + } + return metricValueList, nil +} + +func (c *CustomMetricsProvider) getPreferredVersion(discoveryClient *discovery.DiscoveryClient) (schema.GroupVersion, error) { + apiGroups, err := discoveryClient.ServerGroups() + if err != nil { + return schema.GroupVersion{}, err + } + if gv, support := supportedMetricsAPIVersionAvailable(apiGroups); support { + return gv, nil + } + return schema.GroupVersion{}, fmt.Errorf("custom.metrics.k8s.io not found") +} + +func (c *CustomMetricsProvider) getMetricByName(ctx context.Context, clusterName string, name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) { + // handle namespace separately + if info.GroupResource.Resource == "namespaces" && info.GroupResource.Group == "" { + return c.getForNamespace(ctx, clusterName, name.Name, info.Metric, metricSelector) + } + discoveryClient := c.multiClusterDiscovery.Get(clusterName) + if discoveryClient == nil { + err := fmt.Errorf("failed to get MultiClusterDiscovery for cluster(%s)", clusterName) + klog.Error(err) + return nil, err + } + version, err := c.getPreferredVersion(discoveryClient) + if err != nil { + klog.Errorf("failed to get custom.metrics.k8s.io preferred version for cluster(%s),Error: %v", clusterName, err) + return nil, err + } + params, err := versionConverter.ConvertListOptionsToVersion(&custom_metrics.MetricListOptions{ + MetricLabelSelector: metricSelector.String(), + }, version) + if err != nil { + klog.Errorf("failed to convert ListOptions to Version for cluster(%s),Error: %v", clusterName, err) + return nil, err + } + req := discoveryClient.RESTClient().Get().Prefix("/apis/" + version.String()).Resource(info.GroupResource.String()) + if info.Namespaced { + req = req.Namespace(name.Namespace) + } + result := req.Name(name.Name).SubResource(info.Metric).SpecificallyVersionedParams(params, custommetricsschema.ParameterCodec, version).Do(ctx) + metricObj, err := versionConverter.ConvertResultToVersion(result, custommetricsv1beta2.SchemeGroupVersion) + if err != nil { + return nil, err + } + return metricsConvertCustomMetricsV1beta2ToInternalCustomMetrics(metricObj) +} + +func (c *CustomMetricsProvider) getMetricBySelector(ctx context.Context, clusterName, namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) { + // handle namespace separately + if info.GroupResource.Resource == "namespaces" && info.GroupResource.Group == "" { + return c.getForNamespace(ctx, clusterName, custommetricsv1beta2.AllObjects, info.Metric, metricSelector) + } + discoveryClient := c.multiClusterDiscovery.Get(clusterName) + if discoveryClient == nil { + err := fmt.Errorf("failed to get MultiClusterDiscovery for cluster(%s)", clusterName) + klog.Error(err) + return nil, err + } + version, err := c.getPreferredVersion(discoveryClient) + if err != nil { + klog.Errorf("failed to get custom.metrics.k8s.io preferred version for cluster(%s),Error: %v", clusterName, err) + return nil, err + } + params, err := versionConverter.ConvertListOptionsToVersion(&custom_metrics.MetricListOptions{ + MetricLabelSelector: metricSelector.String(), + LabelSelector: selector.String(), + }, version) + if err != nil { + klog.Errorf("failed to convert ListOptions to Version for cluster(%s),Error: %v", clusterName, err) + return nil, err + } + req := discoveryClient.RESTClient().Get().Prefix("/apis/" + version.String()).Resource(info.GroupResource.String()) + if info.Namespaced { + req = req.Namespace(namespace) + } + result := req.Name(custommetricsv1beta2.AllObjects).SubResource(info.Metric). + SpecificallyVersionedParams(params, custommetricsschema.ParameterCodec, version). + Do(ctx) + metricObj, err := versionConverter.ConvertResultToVersion(result, custommetricsv1beta2.SchemeGroupVersion) + if err != nil { + return nil, err + } + return metricsConvertCustomMetricsV1beta2ToInternalCustomMetrics(metricObj) +} + +func (c *CustomMetricsProvider) getForNamespace(ctx context.Context, clusterName, namespace string, metricName string, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) { + discoveryClient := c.multiClusterDiscovery.Get(clusterName) + if discoveryClient == nil { + err := fmt.Errorf("failed to get MultiClusterDiscovery for cluster(%s)", clusterName) + klog.Error(err) + return nil, err + } + version, err := c.getPreferredVersion(discoveryClient) + if err != nil { + klog.Errorf("failed to get custom.metrics.k8s.io preferred version for cluster(%s),Error: %v", clusterName, err) + return nil, err + } + params, err := versionConverter.ConvertListOptionsToVersion(&custom_metrics.MetricListOptions{ + MetricLabelSelector: metricSelector.String(), + }, version) + if err != nil { + return nil, err + } + result := discoveryClient.RESTClient().Get().Prefix("/apis/"+version.String()). + Resource("metrics"). + Namespace(namespace). + Name(metricName). + SpecificallyVersionedParams(params, custommetricsschema.ParameterCodec, version). + Do(ctx) + metricObj, err := versionConverter.ConvertResultToVersion(result, custommetricsv1beta2.SchemeGroupVersion) + if err != nil { + return nil, err + } + return metricsConvertCustomMetricsV1beta2ToInternalCustomMetrics(metricObj) } // ListAllMetrics returns all metrics in all member clusters func (c *CustomMetricsProvider) ListAllMetrics() []provider.CustomMetricInfo { - return []provider.CustomMetricInfo{} + clusters, err := c.clusterLister.List(labels.Everything()) + if err != nil { + klog.Errorf("Failed to list clusters: %v", err) + return []provider.CustomMetricInfo{} + } + var customMetricInfos []provider.CustomMetricInfo + metricInfoChan := make(chan provider.CustomMetricInfo) + wg := sync.WaitGroup{} + for _, cluster := range clusters { + wg.Add(1) + go func(clusterName string) { + defer wg.Done() + discoveryClient := c.multiClusterDiscovery.Get(clusterName) + if discoveryClient == nil { + err := fmt.Errorf("failed to get MultiClusterDiscovery for cluster(%s)", clusterName) + klog.Error(err) + return + } + apiGroups, err := discoveryClient.ServerGroups() + if err != nil { + klog.Errorf("Failed to query resource in cluster(%s): %+v", clusterName, err) + return + } + supportGroupVersion, support := supportedMetricsAPIVersionAvailable(apiGroups) + if !support { + klog.Warningf("custom.metrics.k8s.io not found in cluster(%s)", clusterName) + return + } + resources, err := discoveryClient.ServerResourcesForGroupVersion(supportGroupVersion.String()) + if err != nil { + klog.Warningf("Failed to query %s resource in cluster(%s): %+v", supportGroupVersion.String(), clusterName, err) + return + } + for _, resource := range resources.APIResources { + // The name of APIResource is composed of Metric name and GroupResource string, e.g. "jobs.batch/promhttp_metric_handler_requests", "pods/process_cpu_seconds". + // Refer to: vendor/sigs.k8s.io/custom-metrics-apiserver/pkg/provider/resource_lister.go:L45 + groupResourceAndMetricName := strings.SplitN(resource.Name, "/", 2) + if len(groupResourceAndMetricName) != 2 { + klog.Warningf("Failed to query %s resource in cluster(%s): %+v", supportGroupVersion.String(), clusterName, err) + continue + } + metricInfoChan <- provider.CustomMetricInfo{ + GroupResource: schema.ParseGroupResource(groupResourceAndMetricName[0]), + Namespaced: resource.Namespaced, + Metric: groupResourceAndMetricName[1], + } + } + }(cluster.Name) + } + go func() { + wg.Wait() + close(metricInfoChan) + }() + for { + metricsInfo, ok := <-metricInfoChan + if !ok { + break + } + customMetricInfos = append(customMetricInfos, metricsInfo) + } + return customMetricInfos +} + +func supportedMetricsAPIVersionAvailable(discoveredAPIGroups *metav1.APIGroupList) (schema.GroupVersion, bool) { + supportedVersionSet := sets.New[string]() + for _, discoveredAPIGroup := range discoveredAPIGroups.Groups { + if discoveredAPIGroup.Name != custom_metrics.GroupName { + continue + } + for _, version := range discoveredAPIGroup.Versions { + supportedVersionSet.Insert(version.Version) + } + } + for _, supportedVersion := range custommetricsclient.MetricVersions { + if supportedVersionSet.Has(supportedVersion.Version) { + return supportedVersion, true + } + } + return schema.GroupVersion{}, false +} + +func metricsConvertCustomMetricsV1beta2ToInternalCustomMetrics(obj runtime.Object) (*custom_metrics.MetricValueList, error) { + var tmp *custommetricsv1beta2.MetricValueList + var ok bool + if tmp, ok = obj.(*custommetricsv1beta2.MetricValueList); !ok { + return nil, fmt.Errorf("the custom metrics API server didn't return MetricValueList, the type is %v", reflect.TypeOf(obj)) + } + res := &custom_metrics.MetricValueList{} + if err := custommetricsv1beta2.Convert_v1beta2_MetricValueList_To_custom_metrics_MetricValueList(tmp, res, nil); err != nil { + return nil, err + } + return res, nil }