Merge pull request #3638 from chaunceyjiang/custom-metrics
feat: support custom metrics for metrics adapter
This commit is contained in:
commit
0febf48bc3
|
@ -15,6 +15,34 @@ spec:
|
|||
version: v1beta1
|
||||
versionPriority: 10
|
||||
---
|
||||
apiVersion: apiregistration.k8s.io/v1
|
||||
kind: APIService
|
||||
metadata:
|
||||
name: v1beta2.custom.metrics.k8s.io
|
||||
spec:
|
||||
service:
|
||||
name: karmada-metrics-adapter
|
||||
namespace: karmada-system
|
||||
group: custom.metrics.k8s.io
|
||||
version: v1beta2
|
||||
insecureSkipTLSVerify: true
|
||||
groupPriorityMinimum: 100
|
||||
versionPriority: 200
|
||||
---
|
||||
apiVersion: apiregistration.k8s.io/v1
|
||||
kind: APIService
|
||||
metadata:
|
||||
name: v1beta1.custom.metrics.k8s.io
|
||||
spec:
|
||||
service:
|
||||
name: karmada-metrics-adapter
|
||||
namespace: karmada-system
|
||||
group: custom.metrics.k8s.io
|
||||
version: v1beta1
|
||||
insecureSkipTLSVerify: true
|
||||
groupPriorityMinimum: 100
|
||||
versionPriority: 200
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"github.com/spf13/pflag"
|
||||
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/custom-metrics-apiserver/pkg/cmd/options"
|
||||
|
@ -56,8 +58,9 @@ func (o *Options) Config() (*metricsadapter.MetricsServer, error) {
|
|||
|
||||
karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
|
||||
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
|
||||
|
||||
metricsController := metricsadapter.NewMetricsController(restConfig, factory)
|
||||
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
|
||||
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
|
||||
metricsController := metricsadapter.NewMetricsController(restConfig, factory, kubeFactory)
|
||||
metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions)
|
||||
metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
|
||||
metricsAdapter.OpenAPIConfig.Info.Title = "karmada-metrics-adapter"
|
||||
|
@ -70,6 +73,9 @@ func (o *Options) Config() (*metricsadapter.MetricsServer, error) {
|
|||
}
|
||||
|
||||
err = server.GenericAPIServer.AddPostStartHook("start-karmada-informers", func(context genericapiserver.PostStartHookContext) error {
|
||||
kubeFactory.Core().V1().Secrets().Informer()
|
||||
kubeFactory.Start(context.StopCh)
|
||||
kubeFactory.WaitForCacheSync(context.StopCh)
|
||||
factory.Start(context.StopCh)
|
||||
return nil
|
||||
})
|
||||
|
|
|
@ -18,9 +18,8 @@ type MetricsAdapter struct {
|
|||
func NewMetricsAdapter(controller *MetricsController, customMetricsAdapterServerOptions *options.CustomMetricsAdapterServerOptions) *MetricsAdapter {
|
||||
adapter := &MetricsAdapter{}
|
||||
adapter.CustomMetricsAdapterServerOptions = customMetricsAdapterServerOptions
|
||||
|
||||
adapter.ResourceMetricsProvider = provider.NewResourceMetricsProvider(controller.ClusterLister, controller.InformerManager)
|
||||
customProvider := provider.MakeCustomMetricsProvider()
|
||||
customProvider := provider.MakeCustomMetricsProvider(controller.ClusterLister, controller.MultiClusterDiscovery)
|
||||
externalProvider := provider.MakeExternalMetricsProvider()
|
||||
adapter.WithCustomMetrics(customProvider)
|
||||
adapter.WithExternalMetrics(externalProvider)
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
@ -16,6 +17,7 @@ import (
|
|||
clusterV1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
|
||||
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/metricsadapter/multiclient"
|
||||
"github.com/karmada-io/karmada/pkg/metricsadapter/provider"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
|
||||
|
@ -29,23 +31,24 @@ var (
|
|||
|
||||
// MetricsController is a controller for metrics, control the lifecycle of multi-clusters informer
|
||||
type MetricsController struct {
|
||||
InformerFactory informerfactory.SharedInformerFactory
|
||||
ClusterLister clusterlister.ClusterLister
|
||||
InformerManager genericmanager.MultiClusterInformerManager
|
||||
|
||||
queue workqueue.RateLimitingInterface
|
||||
restConfig *rest.Config
|
||||
InformerFactory informerfactory.SharedInformerFactory
|
||||
ClusterLister clusterlister.ClusterLister
|
||||
InformerManager genericmanager.MultiClusterInformerManager
|
||||
MultiClusterDiscovery multiclient.MultiClusterDiscoveryInterface
|
||||
queue workqueue.RateLimitingInterface
|
||||
restConfig *rest.Config
|
||||
}
|
||||
|
||||
// NewMetricsController creates a new metrics controller
|
||||
func NewMetricsController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory) *MetricsController {
|
||||
func NewMetricsController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, kubeFactory informers.SharedInformerFactory) *MetricsController {
|
||||
clusterLister := factory.Cluster().V1alpha1().Clusters().Lister()
|
||||
controller := &MetricsController{
|
||||
InformerFactory: factory,
|
||||
ClusterLister: clusterLister,
|
||||
InformerManager: genericmanager.GetInstance(),
|
||||
restConfig: restConfig,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "metrics-adapter"),
|
||||
InformerFactory: factory,
|
||||
ClusterLister: clusterLister,
|
||||
MultiClusterDiscovery: multiclient.NewMultiClusterDiscoveryClient(clusterLister, kubeFactory),
|
||||
InformerManager: genericmanager.GetInstance(),
|
||||
restConfig: restConfig,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "metrics-adapter"),
|
||||
}
|
||||
controller.addEventHandler()
|
||||
|
||||
|
@ -127,6 +130,7 @@ func (m *MetricsController) handleClusters() bool {
|
|||
if apierrors.IsNotFound(err) {
|
||||
klog.Infof("try to stop cluster informer %s", clusterName)
|
||||
m.InformerManager.Stop(clusterName)
|
||||
m.MultiClusterDiscovery.Remove(clusterName)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
@ -135,12 +139,14 @@ func (m *MetricsController) handleClusters() bool {
|
|||
if !cls.DeletionTimestamp.IsZero() {
|
||||
klog.Infof("try to stop cluster informer %s", clusterName)
|
||||
m.InformerManager.Stop(clusterName)
|
||||
m.MultiClusterDiscovery.Remove(clusterName)
|
||||
return true
|
||||
}
|
||||
|
||||
if !util.IsClusterReady(&cls.Status) {
|
||||
klog.Warningf("cluster %s is notReady try to stop this cluster informer", clusterName)
|
||||
m.InformerManager.Stop(clusterName)
|
||||
m.MultiClusterDiscovery.Remove(clusterName)
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -159,8 +165,12 @@ func (m *MetricsController) handleClusters() bool {
|
|||
}
|
||||
_ = m.InformerManager.ForCluster(clusterName, clusterDynamicClient.DynamicClientSet, 0)
|
||||
}
|
||||
err = m.MultiClusterDiscovery.Set(clusterName)
|
||||
if err != nil {
|
||||
klog.Warningf("failed to build discoveryClient for cluster(%s), Error: %+v", clusterName, err)
|
||||
return true
|
||||
}
|
||||
sci := m.InformerManager.GetSingleClusterManager(clusterName)
|
||||
|
||||
// Just trigger the informer to work
|
||||
_ = sci.Lister(provider.PodsGVR)
|
||||
_ = sci.Lister(provider.NodesGVR)
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
package multiclient
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/informers"
|
||||
listcorev1 "k8s.io/client-go/listers/core/v1"
|
||||
|
||||
clusterV1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
)
|
||||
|
||||
// MultiClusterDiscoveryInterface provides DiscoveryClient for multiple clusters.
|
||||
type MultiClusterDiscoveryInterface interface {
|
||||
Get(clusterName string) *discovery.DiscoveryClient
|
||||
Set(clusterName string) error
|
||||
Remove(clusterName string)
|
||||
}
|
||||
|
||||
// MultiClusterDiscovery provides DiscoveryClient for multiple clusters.
|
||||
type MultiClusterDiscovery struct {
|
||||
sync.RWMutex
|
||||
clients map[string]*discovery.DiscoveryClient
|
||||
secretLister listcorev1.SecretLister
|
||||
clusterLister clusterlister.ClusterLister
|
||||
}
|
||||
|
||||
// NewMultiClusterDiscoveryClient returns a new MultiClusterDiscovery
|
||||
func NewMultiClusterDiscoveryClient(clusterLister clusterlister.ClusterLister, KubeFactory informers.SharedInformerFactory) MultiClusterDiscoveryInterface {
|
||||
return &MultiClusterDiscovery{
|
||||
clusterLister: clusterLister,
|
||||
secretLister: KubeFactory.Core().V1().Secrets().Lister(),
|
||||
clients: map[string]*discovery.DiscoveryClient{},
|
||||
}
|
||||
}
|
||||
|
||||
// Get returns a DiscoveryClient for the provided clusterName.
|
||||
func (m *MultiClusterDiscovery) Get(clusterName string) *discovery.DiscoveryClient {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
return m.clients[clusterName]
|
||||
}
|
||||
|
||||
// Set a DiscoveryClient for the provided clusterName.
|
||||
func (m *MultiClusterDiscovery) Set(clusterName string) error {
|
||||
clusterGetter := func(cluster string) (*clusterV1alpha1.Cluster, error) {
|
||||
return m.clusterLister.Get(cluster)
|
||||
}
|
||||
secretGetter := func(namespace string, name string) (*corev1.Secret, error) {
|
||||
return m.secretLister.Secrets(namespace).Get(name)
|
||||
}
|
||||
clusterConfig, err := util.BuildClusterConfig(clusterName, clusterGetter, secretGetter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
m.clients[clusterName] = discovery.NewDiscoveryClientForConfigOrDie(clusterConfig)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove a DiscoveryClient for the provided clusterName.
|
||||
func (m *MultiClusterDiscovery) Remove(clusterName string) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
delete(m.clients, clusterName)
|
||||
}
|
|
@ -3,33 +3,355 @@ package provider
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/metrics/pkg/apis/custom_metrics"
|
||||
custommetricsv1beta2 "k8s.io/metrics/pkg/apis/custom_metrics/v1beta2"
|
||||
custommetricsclient "k8s.io/metrics/pkg/client/custom_metrics"
|
||||
custommetricsschema "k8s.io/metrics/pkg/client/custom_metrics/scheme"
|
||||
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"
|
||||
|
||||
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/metricsadapter/multiclient"
|
||||
)
|
||||
|
||||
var (
|
||||
versionConverter = custommetricsclient.NewMetricConverter()
|
||||
)
|
||||
|
||||
// CustomMetricsProvider is a custom metrics provider
|
||||
type CustomMetricsProvider struct {
|
||||
// multiClusterDiscovery returns a discovery client for member cluster apiserver
|
||||
multiClusterDiscovery multiclient.MultiClusterDiscoveryInterface
|
||||
clusterLister clusterlister.ClusterLister
|
||||
}
|
||||
|
||||
// MakeCustomMetricsProvider creates a new custom metrics provider
|
||||
func MakeCustomMetricsProvider() *CustomMetricsProvider {
|
||||
return &CustomMetricsProvider{}
|
||||
func MakeCustomMetricsProvider(clusterLister clusterlister.ClusterLister, multiClusterDiscovery multiclient.MultiClusterDiscoveryInterface) *CustomMetricsProvider {
|
||||
return &CustomMetricsProvider{
|
||||
clusterLister: clusterLister,
|
||||
multiClusterDiscovery: multiClusterDiscovery,
|
||||
}
|
||||
}
|
||||
|
||||
// GetMetricByName will query metrics by name from member clusters and return the result
|
||||
func (c *CustomMetricsProvider) GetMetricByName(ctx context.Context, name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
|
||||
return nil, fmt.Errorf("karmada-metrics-adapter still not implement it")
|
||||
clusters, err := c.clusterLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to list clusters: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
metricValueList := &custom_metrics.MetricValueList{}
|
||||
metricsChanel := make(chan *custom_metrics.MetricValueList)
|
||||
wg := sync.WaitGroup{}
|
||||
for _, cluster := range clusters {
|
||||
wg.Add(1)
|
||||
go func(clusterName string) {
|
||||
defer wg.Done()
|
||||
metrics, err := c.getMetricByName(ctx, clusterName, name, info, metricSelector)
|
||||
if err != nil {
|
||||
klog.Warningf("query %s's %s metric from cluster %s failed, err: %+v", info.GroupResource.String(), info.Metric, clusterName, err)
|
||||
return
|
||||
}
|
||||
metricsChanel <- metrics
|
||||
}(cluster.Name)
|
||||
}
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(metricsChanel)
|
||||
}()
|
||||
for {
|
||||
metrics, ok := <-metricsChanel
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
metricValueList.Items = append(metricValueList.Items, metrics.Items...)
|
||||
}
|
||||
var metrics *custom_metrics.MetricValue
|
||||
// TODO(chaunceyjiang) The MetricValue items need to be sorted.
|
||||
for i := range metricValueList.Items {
|
||||
if metrics == nil {
|
||||
metrics = &metricValueList.Items[i]
|
||||
continue
|
||||
}
|
||||
// metrics is unique in one cluster, but it may exist in multiple clusters.
|
||||
// for this situation, we need to add the value of all clusters.
|
||||
metrics.Value.Add(metricValueList.Items[i].Value)
|
||||
}
|
||||
if metrics == nil {
|
||||
return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric)
|
||||
}
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
// GetMetricBySelector will query metrics by selector from member clusters and return the result
|
||||
func (c *CustomMetricsProvider) GetMetricBySelector(ctx context.Context, namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
|
||||
return nil, fmt.Errorf("karmada-metrics-adapter still not implement it")
|
||||
clusters, err := c.clusterLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to list clusters: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
metricValueList := &custom_metrics.MetricValueList{}
|
||||
wg := sync.WaitGroup{}
|
||||
metricsChanel := make(chan *custom_metrics.MetricValueList)
|
||||
for _, cluster := range clusters {
|
||||
wg.Add(1)
|
||||
go func(clusterName string) {
|
||||
defer wg.Done()
|
||||
metrics, err := c.getMetricBySelector(ctx, clusterName, namespace, selector, info, metricSelector)
|
||||
if err != nil {
|
||||
klog.Warningf("query %s's %s metric from cluster %s failed", info.GroupResource.String(), info.Metric, clusterName)
|
||||
return
|
||||
}
|
||||
metricsChanel <- metrics
|
||||
}(cluster.Name)
|
||||
}
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(metricsChanel)
|
||||
}()
|
||||
sameMetrics := make(map[string]custom_metrics.MetricValue)
|
||||
for {
|
||||
metrics, ok := <-metricsChanel
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
// TODO(chaunceyjiang) The MetricValue items need to be sorted.
|
||||
for _, metric := range metrics.Items {
|
||||
// metrics is unique in one cluster, but it may exist in multiple clusters.
|
||||
// for this situation, we need to add the value of all clusters.
|
||||
if metricValue, same := sameMetrics[metric.DescribedObject.Name]; same {
|
||||
metric.Value.Add(metricValue.Value)
|
||||
}
|
||||
sameMetrics[metric.DescribedObject.Name] = metric
|
||||
}
|
||||
}
|
||||
for _, metric := range sameMetrics {
|
||||
metricValueList.Items = append(metricValueList.Items, metric)
|
||||
}
|
||||
if len(metricValueList.Items) == 0 {
|
||||
return nil, provider.NewMetricNotFoundError(info.GroupResource, info.Metric)
|
||||
}
|
||||
return metricValueList, nil
|
||||
}
|
||||
|
||||
func (c *CustomMetricsProvider) getPreferredVersion(discoveryClient *discovery.DiscoveryClient) (schema.GroupVersion, error) {
|
||||
apiGroups, err := discoveryClient.ServerGroups()
|
||||
if err != nil {
|
||||
return schema.GroupVersion{}, err
|
||||
}
|
||||
if gv, support := supportedMetricsAPIVersionAvailable(apiGroups); support {
|
||||
return gv, nil
|
||||
}
|
||||
return schema.GroupVersion{}, fmt.Errorf("custom.metrics.k8s.io not found")
|
||||
}
|
||||
|
||||
func (c *CustomMetricsProvider) getMetricByName(ctx context.Context, clusterName string, name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
|
||||
// handle namespace separately
|
||||
if info.GroupResource.Resource == "namespaces" && info.GroupResource.Group == "" {
|
||||
return c.getForNamespace(ctx, clusterName, name.Name, info.Metric, metricSelector)
|
||||
}
|
||||
discoveryClient := c.multiClusterDiscovery.Get(clusterName)
|
||||
if discoveryClient == nil {
|
||||
err := fmt.Errorf("failed to get MultiClusterDiscovery for cluster(%s)", clusterName)
|
||||
klog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
version, err := c.getPreferredVersion(discoveryClient)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get custom.metrics.k8s.io preferred version for cluster(%s),Error: %v", clusterName, err)
|
||||
return nil, err
|
||||
}
|
||||
params, err := versionConverter.ConvertListOptionsToVersion(&custom_metrics.MetricListOptions{
|
||||
MetricLabelSelector: metricSelector.String(),
|
||||
}, version)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to convert ListOptions to Version for cluster(%s),Error: %v", clusterName, err)
|
||||
return nil, err
|
||||
}
|
||||
req := discoveryClient.RESTClient().Get().Prefix("/apis/" + version.String()).Resource(info.GroupResource.String())
|
||||
if info.Namespaced {
|
||||
req = req.Namespace(name.Namespace)
|
||||
}
|
||||
result := req.Name(name.Name).SubResource(info.Metric).SpecificallyVersionedParams(params, custommetricsschema.ParameterCodec, version).Do(ctx)
|
||||
metricObj, err := versionConverter.ConvertResultToVersion(result, custommetricsv1beta2.SchemeGroupVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return metricsConvertCustomMetricsV1beta2ToInternalCustomMetrics(metricObj)
|
||||
}
|
||||
|
||||
func (c *CustomMetricsProvider) getMetricBySelector(ctx context.Context, clusterName, namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
|
||||
// handle namespace separately
|
||||
if info.GroupResource.Resource == "namespaces" && info.GroupResource.Group == "" {
|
||||
return c.getForNamespace(ctx, clusterName, custommetricsv1beta2.AllObjects, info.Metric, metricSelector)
|
||||
}
|
||||
discoveryClient := c.multiClusterDiscovery.Get(clusterName)
|
||||
if discoveryClient == nil {
|
||||
err := fmt.Errorf("failed to get MultiClusterDiscovery for cluster(%s)", clusterName)
|
||||
klog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
version, err := c.getPreferredVersion(discoveryClient)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get custom.metrics.k8s.io preferred version for cluster(%s),Error: %v", clusterName, err)
|
||||
return nil, err
|
||||
}
|
||||
params, err := versionConverter.ConvertListOptionsToVersion(&custom_metrics.MetricListOptions{
|
||||
MetricLabelSelector: metricSelector.String(),
|
||||
LabelSelector: selector.String(),
|
||||
}, version)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to convert ListOptions to Version for cluster(%s),Error: %v", clusterName, err)
|
||||
return nil, err
|
||||
}
|
||||
req := discoveryClient.RESTClient().Get().Prefix("/apis/" + version.String()).Resource(info.GroupResource.String())
|
||||
if info.Namespaced {
|
||||
req = req.Namespace(namespace)
|
||||
}
|
||||
result := req.Name(custommetricsv1beta2.AllObjects).SubResource(info.Metric).
|
||||
SpecificallyVersionedParams(params, custommetricsschema.ParameterCodec, version).
|
||||
Do(ctx)
|
||||
metricObj, err := versionConverter.ConvertResultToVersion(result, custommetricsv1beta2.SchemeGroupVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return metricsConvertCustomMetricsV1beta2ToInternalCustomMetrics(metricObj)
|
||||
}
|
||||
|
||||
func (c *CustomMetricsProvider) getForNamespace(ctx context.Context, clusterName, namespace string, metricName string, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
|
||||
discoveryClient := c.multiClusterDiscovery.Get(clusterName)
|
||||
if discoveryClient == nil {
|
||||
err := fmt.Errorf("failed to get MultiClusterDiscovery for cluster(%s)", clusterName)
|
||||
klog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
version, err := c.getPreferredVersion(discoveryClient)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get custom.metrics.k8s.io preferred version for cluster(%s),Error: %v", clusterName, err)
|
||||
return nil, err
|
||||
}
|
||||
params, err := versionConverter.ConvertListOptionsToVersion(&custom_metrics.MetricListOptions{
|
||||
MetricLabelSelector: metricSelector.String(),
|
||||
}, version)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := discoveryClient.RESTClient().Get().Prefix("/apis/"+version.String()).
|
||||
Resource("metrics").
|
||||
Namespace(namespace).
|
||||
Name(metricName).
|
||||
SpecificallyVersionedParams(params, custommetricsschema.ParameterCodec, version).
|
||||
Do(ctx)
|
||||
metricObj, err := versionConverter.ConvertResultToVersion(result, custommetricsv1beta2.SchemeGroupVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return metricsConvertCustomMetricsV1beta2ToInternalCustomMetrics(metricObj)
|
||||
}
|
||||
|
||||
// ListAllMetrics returns all metrics in all member clusters
|
||||
func (c *CustomMetricsProvider) ListAllMetrics() []provider.CustomMetricInfo {
|
||||
return []provider.CustomMetricInfo{}
|
||||
clusters, err := c.clusterLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to list clusters: %v", err)
|
||||
return []provider.CustomMetricInfo{}
|
||||
}
|
||||
var customMetricInfos []provider.CustomMetricInfo
|
||||
metricInfoChan := make(chan provider.CustomMetricInfo)
|
||||
wg := sync.WaitGroup{}
|
||||
for _, cluster := range clusters {
|
||||
wg.Add(1)
|
||||
go func(clusterName string) {
|
||||
defer wg.Done()
|
||||
discoveryClient := c.multiClusterDiscovery.Get(clusterName)
|
||||
if discoveryClient == nil {
|
||||
err := fmt.Errorf("failed to get MultiClusterDiscovery for cluster(%s)", clusterName)
|
||||
klog.Error(err)
|
||||
return
|
||||
}
|
||||
apiGroups, err := discoveryClient.ServerGroups()
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to query resource in cluster(%s): %+v", clusterName, err)
|
||||
return
|
||||
}
|
||||
supportGroupVersion, support := supportedMetricsAPIVersionAvailable(apiGroups)
|
||||
if !support {
|
||||
klog.Warningf("custom.metrics.k8s.io not found in cluster(%s)", clusterName)
|
||||
return
|
||||
}
|
||||
resources, err := discoveryClient.ServerResourcesForGroupVersion(supportGroupVersion.String())
|
||||
if err != nil {
|
||||
klog.Warningf("Failed to query %s resource in cluster(%s): %+v", supportGroupVersion.String(), clusterName, err)
|
||||
return
|
||||
}
|
||||
for _, resource := range resources.APIResources {
|
||||
// The name of APIResource is composed of Metric name and GroupResource string, e.g. "jobs.batch/promhttp_metric_handler_requests", "pods/process_cpu_seconds".
|
||||
// Refer to: vendor/sigs.k8s.io/custom-metrics-apiserver/pkg/provider/resource_lister.go:L45
|
||||
groupResourceAndMetricName := strings.SplitN(resource.Name, "/", 2)
|
||||
if len(groupResourceAndMetricName) != 2 {
|
||||
klog.Warningf("Failed to query %s resource in cluster(%s): %+v", supportGroupVersion.String(), clusterName, err)
|
||||
continue
|
||||
}
|
||||
metricInfoChan <- provider.CustomMetricInfo{
|
||||
GroupResource: schema.ParseGroupResource(groupResourceAndMetricName[0]),
|
||||
Namespaced: resource.Namespaced,
|
||||
Metric: groupResourceAndMetricName[1],
|
||||
}
|
||||
}
|
||||
}(cluster.Name)
|
||||
}
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(metricInfoChan)
|
||||
}()
|
||||
for {
|
||||
metricsInfo, ok := <-metricInfoChan
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
customMetricInfos = append(customMetricInfos, metricsInfo)
|
||||
}
|
||||
return customMetricInfos
|
||||
}
|
||||
|
||||
func supportedMetricsAPIVersionAvailable(discoveredAPIGroups *metav1.APIGroupList) (schema.GroupVersion, bool) {
|
||||
supportedVersionSet := sets.New[string]()
|
||||
for _, discoveredAPIGroup := range discoveredAPIGroups.Groups {
|
||||
if discoveredAPIGroup.Name != custom_metrics.GroupName {
|
||||
continue
|
||||
}
|
||||
for _, version := range discoveredAPIGroup.Versions {
|
||||
supportedVersionSet.Insert(version.Version)
|
||||
}
|
||||
}
|
||||
for _, supportedVersion := range custommetricsclient.MetricVersions {
|
||||
if supportedVersionSet.Has(supportedVersion.Version) {
|
||||
return supportedVersion, true
|
||||
}
|
||||
}
|
||||
return schema.GroupVersion{}, false
|
||||
}
|
||||
|
||||
func metricsConvertCustomMetricsV1beta2ToInternalCustomMetrics(obj runtime.Object) (*custom_metrics.MetricValueList, error) {
|
||||
var tmp *custommetricsv1beta2.MetricValueList
|
||||
var ok bool
|
||||
if tmp, ok = obj.(*custommetricsv1beta2.MetricValueList); !ok {
|
||||
return nil, fmt.Errorf("the custom metrics API server didn't return MetricValueList, the type is %v", reflect.TypeOf(obj))
|
||||
}
|
||||
res := &custom_metrics.MetricValueList{}
|
||||
if err := custommetricsv1beta2.Convert_v1beta2_MetricValueList_To_custom_metrics_MetricValueList(tmp, res, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue