From 1186588d1d81f4b32dff319aecaa7f2853bf2e8f Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Wed, 3 Apr 2024 10:35:55 +0800 Subject: [PATCH] karmada-metrics-adapter: reduce memory usage When there is a large amount of pod usage in the member cluster, metrics-adapter will consume a lot of memory. The reason is that it caches all the information of all pods in the cluster. However, we don't need all this information, so we trim some of the information to reduce memory usage. Signed-off-by: chaunceyjiang --- pkg/metricsadapter/adapter.go | 2 +- pkg/metricsadapter/controller.go | 88 +++++++++++- .../provider/resourcemetrics.go | 132 ++++++++++-------- 3 files changed, 160 insertions(+), 62 deletions(-) diff --git a/pkg/metricsadapter/adapter.go b/pkg/metricsadapter/adapter.go index 6b65e0c98..0785f8bd9 100755 --- a/pkg/metricsadapter/adapter.go +++ b/pkg/metricsadapter/adapter.go @@ -34,7 +34,7 @@ type MetricsAdapter struct { func NewMetricsAdapter(controller *MetricsController, customMetricsAdapterServerOptions *options.CustomMetricsAdapterServerOptions) *MetricsAdapter { adapter := &MetricsAdapter{} adapter.CustomMetricsAdapterServerOptions = customMetricsAdapterServerOptions - adapter.ResourceMetricsProvider = provider.NewResourceMetricsProvider(controller.ClusterLister, controller.InformerManager) + adapter.ResourceMetricsProvider = provider.NewResourceMetricsProvider(controller.ClusterLister, controller.TypedInformerManager, controller.InformerManager) customProvider := provider.MakeCustomMetricsProvider(controller.ClusterLister, controller.MultiClusterDiscovery) externalProvider := provider.MakeExternalMetricsProvider() adapter.WithCustomMetrics(customProvider) diff --git a/pkg/metricsadapter/controller.go b/pkg/metricsadapter/controller.go index 6a0e46257..b4f520d92 100755 --- a/pkg/metricsadapter/controller.go +++ b/pkg/metricsadapter/controller.go @@ -18,11 +18,14 @@ package metricsadapter import ( "context" + "fmt" + "reflect" "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/rest" @@ -37,6 +40,7 @@ import ( "github.com/karmada-io/karmada/pkg/metricsadapter/provider" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager" "github.com/karmada-io/karmada/pkg/util/gclient" ) @@ -50,6 +54,7 @@ type MetricsController struct { InformerFactory informerfactory.SharedInformerFactory ClusterLister clusterlister.ClusterLister InformerManager genericmanager.MultiClusterInformerManager + TypedInformerManager typedmanager.MultiClusterInformerManager MultiClusterDiscovery multiclient.MultiClusterDiscoveryInterface queue workqueue.RateLimitingInterface restConfig *rest.Config @@ -63,6 +68,7 @@ func NewMetricsController(restConfig *rest.Config, factory informerfactory.Share ClusterLister: clusterLister, MultiClusterDiscovery: multiclient.NewMultiClusterDiscoveryClient(clusterLister, kubeFactory), InformerManager: genericmanager.GetInstance(), + TypedInformerManager: newInstance(), restConfig: restConfig, queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{ Name: "metrics-adapter", @@ -73,6 +79,66 @@ func NewMetricsController(restConfig *rest.Config, factory informerfactory.Share return controller } +func nodeTransformFunc(obj interface{}) (interface{}, error) { + var node *corev1.Node + switch t := obj.(type) { + case *corev1.Node: + node = t + case cache.DeletedFinalStateUnknown: + var ok bool + node, ok = t.Obj.(*corev1.Node) + if !ok { + return obj, fmt.Errorf("expect resource Node but got %v", reflect.TypeOf(t.Obj)) + } + default: + return obj, fmt.Errorf("expect resource Node but got %v", reflect.TypeOf(obj)) + } + + aggregatedNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node.Name, + Namespace: node.Namespace, + Labels: node.Labels, + DeletionTimestamp: node.DeletionTimestamp, + }, + } + return aggregatedNode, nil +} + +func podTransformFunc(obj interface{}) (interface{}, error) { + var pod *corev1.Pod + switch t := obj.(type) { + case *corev1.Pod: + pod = t + case cache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*corev1.Pod) + if !ok { + return obj, fmt.Errorf("expect resource Pod but got %v", reflect.TypeOf(t.Obj)) + } + default: + return obj, fmt.Errorf("expect resource Pod but got %v", reflect.TypeOf(obj)) + } + + aggregatedPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + Labels: pod.Labels, + DeletionTimestamp: pod.DeletionTimestamp, + }, + } + return aggregatedPod, nil +} + +func newInstance() typedmanager.MultiClusterInformerManager { + transforms := map[schema.GroupVersionResource]cache.TransformFunc{ + provider.NodesGVR: cache.TransformFunc(nodeTransformFunc), + provider.PodsGVR: cache.TransformFunc(podTransformFunc), + } + return typedmanager.NewMultiClusterInformerManager(context.TODO().Done(), transforms) +} + // addEventHandler adds event handler for cluster func (m *MetricsController) addEventHandler() { clusterInformer := m.InformerFactory.Cluster().V1alpha1().Clusters().Informer() @@ -147,6 +213,7 @@ func (m *MetricsController) handleClusters() bool { if err != nil { if apierrors.IsNotFound(err) { klog.Infof("try to stop cluster informer %s", clusterName) + m.TypedInformerManager.Stop(clusterName) m.InformerManager.Stop(clusterName) m.MultiClusterDiscovery.Remove(clusterName) return true @@ -156,6 +223,7 @@ func (m *MetricsController) handleClusters() bool { if !cls.DeletionTimestamp.IsZero() { klog.Infof("try to stop cluster informer %s", clusterName) + m.TypedInformerManager.Stop(clusterName) m.InformerManager.Stop(clusterName) m.MultiClusterDiscovery.Remove(clusterName) return true @@ -163,14 +231,19 @@ func (m *MetricsController) handleClusters() bool { if !util.IsClusterReady(&cls.Status) { klog.Warningf("cluster %s is notReady try to stop this cluster informer", clusterName) + m.TypedInformerManager.Stop(clusterName) m.InformerManager.Stop(clusterName) m.MultiClusterDiscovery.Remove(clusterName) return false } - if !m.InformerManager.IsManagerExist(clusterName) { + if !m.TypedInformerManager.IsManagerExist(clusterName) { klog.Info("Try to build informer manager for cluster ", clusterName) controlPlaneClient := gclient.NewForConfigOrDie(m.restConfig) + clusterClient, err := util.NewClusterClientSet(clusterName, controlPlaneClient, nil) + if err != nil { + return false + } clusterDynamicClient, err := util.NewClusterDynamicClientSet(clusterName, controlPlaneClient) if err != nil { return false @@ -181,6 +254,7 @@ func (m *MetricsController) handleClusters() bool { klog.Warningf("unable to access cluster %s, Error: %+v", clusterName, err) return true } + _ = m.TypedInformerManager.ForCluster(clusterName, clusterClient.KubeClient, 0) _ = m.InformerManager.ForCluster(clusterName, clusterDynamicClient.DynamicClientSet, 0) } err = m.MultiClusterDiscovery.Set(clusterName) @@ -188,11 +262,15 @@ func (m *MetricsController) handleClusters() bool { klog.Warningf("failed to build discoveryClient for cluster(%s), Error: %+v", clusterName, err) return true } - sci := m.InformerManager.GetSingleClusterManager(clusterName) - // Just trigger the informer to work - _ = sci.Lister(provider.PodsGVR) - _ = sci.Lister(provider.NodesGVR) + typedSci := m.TypedInformerManager.GetSingleClusterManager(clusterName) + // Just trigger the informer to work + _, _ = typedSci.Lister(provider.PodsGVR) + _, _ = typedSci.Lister(provider.NodesGVR) + + typedSci.Start() + _ = typedSci.WaitForCacheSync() + sci := m.InformerManager.GetSingleClusterManager(clusterName) sci.Start() _ = sci.WaitForCacheSync() diff --git a/pkg/metricsadapter/provider/resourcemetrics.go b/pkg/metricsadapter/provider/resourcemetrics.go index 080ca20ba..89a547c1a 100755 --- a/pkg/metricsadapter/provider/resourcemetrics.go +++ b/pkg/metricsadapter/provider/resourcemetrics.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + listv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/metrics/pkg/apis/metrics" @@ -36,6 +37,7 @@ import ( clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager" "github.com/karmada-io/karmada/pkg/util/helper" ) @@ -59,7 +61,7 @@ var ( NodesGVR = corev1.SchemeGroupVersion.WithResource("nodes") ) -type queryResourceFromClustersFunc func(sci genericmanager.SingleClusterInformerManager, clusterName string) error +type queryResourceFromClustersFunc func(sci typedmanager.SingleClusterInformerManager, clusterName string) error type queryMetricsFromClustersFunc func(sci genericmanager.SingleClusterInformerManager, clusterName string) (interface{}, error) // ResourceMetricsProvider is a resource metrics provider, to provide cpu/memory metrics @@ -67,17 +69,19 @@ type ResourceMetricsProvider struct { PodLister *PodLister NodeLister *NodeLister - clusterLister clusterlister.ClusterLister - informerManager genericmanager.MultiClusterInformerManager + clusterLister clusterlister.ClusterLister + informerManager genericmanager.MultiClusterInformerManager + typedInformerManager typedmanager.MultiClusterInformerManager } // NewResourceMetricsProvider creates a new resource metrics provider -func NewResourceMetricsProvider(clusterLister clusterlister.ClusterLister, informerManager genericmanager.MultiClusterInformerManager) *ResourceMetricsProvider { +func NewResourceMetricsProvider(clusterLister clusterlister.ClusterLister, typedInformerManager typedmanager.MultiClusterInformerManager, informerManager genericmanager.MultiClusterInformerManager) *ResourceMetricsProvider { return &ResourceMetricsProvider{ - clusterLister: clusterLister, - informerManager: informerManager, - PodLister: NewPodLister(clusterLister, informerManager), - NodeLister: NewNodeLister(clusterLister, informerManager), + clusterLister: clusterLister, + informerManager: informerManager, + typedInformerManager: typedInformerManager, + PodLister: NewPodLister(clusterLister, typedInformerManager), + NodeLister: NewNodeLister(clusterLister, typedInformerManager), } } @@ -93,7 +97,7 @@ func (r *ResourceMetricsProvider) getMetricsParallel(resourceFunc queryResourceF // step 1. Find out the target clusters in lister cache var targetClusters []string for _, cluster := range clusters { - sci := r.informerManager.GetSingleClusterManager(cluster.Name) + sci := r.typedInformerManager.GetSingleClusterManager(cluster.Name) if sci == nil { klog.Errorf("Failed to get cluster(%s) manager", cluster.Name) continue @@ -161,8 +165,13 @@ func (r *ResourceMetricsProvider) getMetricsParallel(resourceFunc queryResourceF // queryPodMetricsByName queries metrics by pod name from target clusters func (r *ResourceMetricsProvider) queryPodMetricsByName(name, namespace string) ([]metrics.PodMetrics, error) { - resourceQueryFunc := func(sci genericmanager.SingleClusterInformerManager, _ string) error { - _, err := sci.Lister(PodsGVR).ByNamespace(namespace).Get(name) + resourceQueryFunc := func(sci typedmanager.SingleClusterInformerManager, _ string) error { + podInterface, err := sci.Lister(PodsGVR) + if err != nil { + return err + } + lister := podInterface.(listv1.PodLister) + _, err = lister.Pods(namespace).Get(name) return err } metricsQueryFunc := func(sci genericmanager.SingleClusterInformerManager, clusterName string) (interface{}, error) { @@ -199,9 +208,13 @@ func (r *ResourceMetricsProvider) queryPodMetricsBySelector(selector, namespace klog.Errorf("Failed to parse label selector: %v", err) return nil, err } - - resourceQueryFunc := func(sci genericmanager.SingleClusterInformerManager, clusterName string) error { - pods, err := sci.Lister(PodsGVR).ByNamespace(namespace).List(labelSelector) + resourceQueryFunc := func(sci typedmanager.SingleClusterInformerManager, clusterName string) error { + podInterface, err := sci.Lister(PodsGVR) + if err != nil { + return err + } + lister := podInterface.(listv1.PodLister) + pods, err := lister.Pods(namespace).List(labelSelector) if err != nil { klog.Errorf("Failed to list pods in cluster(%s): %v", clusterName, err) return err @@ -245,8 +258,13 @@ func (r *ResourceMetricsProvider) queryPodMetricsBySelector(selector, namespace // queryNodeMetricsByName queries metrics by node name from target clusters func (r *ResourceMetricsProvider) queryNodeMetricsByName(name string) ([]metrics.NodeMetrics, error) { - resourceQueryFunc := func(sci genericmanager.SingleClusterInformerManager, _ string) error { - _, err := sci.Lister(NodesGVR).Get(name) + resourceQueryFunc := func(sci typedmanager.SingleClusterInformerManager, _ string) error { + nodeInterface, err := sci.Lister(PodsGVR) + if err != nil { + return err + } + lister := nodeInterface.(listv1.NodeLister) + _, err = lister.Get(name) return err } metricsQueryFunc := func(sci genericmanager.SingleClusterInformerManager, clusterName string) (interface{}, error) { @@ -282,9 +300,13 @@ func (r *ResourceMetricsProvider) queryNodeMetricsBySelector(selector string) ([ klog.Errorf("Failed to parse label selector: %v", err) return nil, err } - - resourceQueryFunc := func(sci genericmanager.SingleClusterInformerManager, clusterName string) error { - nodes, err := sci.Lister(NodesGVR).List(labelSelector) + resourceQueryFunc := func(sci typedmanager.SingleClusterInformerManager, clusterName string) error { + nodeInterface, err := sci.Lister(NodesGVR) + if err != nil { + return err + } + lister := nodeInterface.(listv1.NodeLister) + nodes, err := lister.List(labelSelector) if err != nil { klog.Errorf("Failed to list pods in cluster(%s): %v", clusterName, err) return err @@ -370,11 +392,11 @@ func (r *ResourceMetricsProvider) GetNodeMetrics(nodes ...*corev1.Node) ([]metri type PodLister struct { namespaceSpecified string clusterLister clusterlister.ClusterLister - informerManager genericmanager.MultiClusterInformerManager + informerManager typedmanager.MultiClusterInformerManager } // NewPodLister creates an internal new PodLister -func NewPodLister(clusterLister clusterlister.ClusterLister, informerManager genericmanager.MultiClusterInformerManager) *PodLister { +func NewPodLister(clusterLister clusterlister.ClusterLister, informerManager typedmanager.MultiClusterInformerManager) *PodLister { return &PodLister{ clusterLister: clusterLister, informerManager: informerManager, @@ -396,19 +418,19 @@ func (p *PodLister) List(selector labels.Selector) (ret []runtime.Object, err er klog.Errorf("Failed to get SingleClusterInformerManager for cluster(%s)", cluster.Name) continue } - pods, err := sci.Lister(PodsGVR).ByNamespace(p.namespaceSpecified).List(selector) + lister, err := sci.Lister(PodsGVR) + if err != nil { + klog.Errorf("Failed to get lister for cluster(%s): %v", cluster.Name, err) + continue + } + podLister := lister.(listv1.PodLister) + pods, err := podLister.Pods(p.namespaceSpecified).List(selector) if err != nil { klog.Errorf("Failed to list pods from cluster(%s) in namespace(%s): %v", cluster.Name, p.namespaceSpecified, err) return nil, err } - for _, pod := range pods { - podTyped := &corev1.Pod{} - err = helper.ConvertToTypedObject(pod, podTyped) - if err != nil { - klog.Errorf("Failed to convert to typed object: %v", err) - return nil, err - } - podPartial := p.convertToPodPartialData(podTyped, selector.String(), true) + for i := range pods { + podPartial := p.convertToPodPartialData(pods[i], selector.String(), true) ret = append(ret, podPartial) } } @@ -454,7 +476,13 @@ func (p *PodLister) Get(name string) (runtime.Object, error) { klog.Errorf("Failed to get SingleClusterInformerManager for cluster(%s)", cluster.Name) continue } - pod, err := sci.Lister(PodsGVR).ByNamespace(p.namespaceSpecified).Get(name) + sciLister, err := sci.Lister(PodsGVR) + if err != nil { + klog.Errorf("Failed to get lister for cluster(%s): %v", cluster.Name, err) + continue + } + podLister := sciLister.(listv1.PodLister) + pod, err := podLister.Pods(p.namespaceSpecified).Get(name) if err != nil { if !errors.IsNotFound(err) { klog.Errorf("Failed to get pod from clsuster(%s) in namespace(%s): %v", cluster.Name, p.namespaceSpecified, err) @@ -466,13 +494,7 @@ func (p *PodLister) Get(name string) (runtime.Object, error) { err := fmt.Errorf("the pod(%s) found in more than one clusters", name) return nil, errors.NewConflict(PodsGVR.GroupResource(), name, err) } - podTyped := &corev1.Pod{} - err = helper.ConvertToTypedObject(pod, podTyped) - if err != nil { - klog.Errorf("Failed to convert to typed object: %v", err) - return nil, err - } - podPartial = p.convertToPodPartialData(podTyped, "", false) + podPartial = p.convertToPodPartialData(pod, "", false) } if podPartial != nil { @@ -497,11 +519,11 @@ func (p *PodLister) ByNamespace(namespace string) cache.GenericNamespaceLister { // NodeLister is an internal lister for nodes type NodeLister struct { clusterLister clusterlister.ClusterLister - informerManager genericmanager.MultiClusterInformerManager + informerManager typedmanager.MultiClusterInformerManager } // NewNodeLister creates an internal new NodeLister -func NewNodeLister(clusterLister clusterlister.ClusterLister, informerManager genericmanager.MultiClusterInformerManager) *NodeLister { +func NewNodeLister(clusterLister clusterlister.ClusterLister, informerManager typedmanager.MultiClusterInformerManager) *NodeLister { return &NodeLister{ clusterLister: clusterLister, informerManager: informerManager, @@ -523,22 +545,21 @@ func (n *NodeLister) List(selector labels.Selector) (ret []*corev1.Node, err err klog.Errorf("Failed to get SingleClusterInformerManager for cluster(%s)", cluster.Name) continue } - nodes, err := sci.Lister(NodesGVR).List(selector) + nodeInterface, err := sci.Lister(NodesGVR) + if err != nil { + klog.Errorf("Failed to get lister for cluster(%s): %v", cluster.Name, err) + continue + } + nodes, err := nodeInterface.(listv1.NodeLister).List(selector) if err != nil { klog.Errorf("Failed to list nodes from cluster(%s): %v", cluster.Name, err) return nil, err } for index := range nodes { - nodeTyped := &corev1.Node{} - err = helper.ConvertToTypedObject(nodes[index], nodeTyped) - if err != nil { - klog.Errorf("Failed to convert to typed object: %v", err) - return nil, err - } + nodeTyped := nodes[index] if nodeTyped.Annotations == nil { nodeTyped.Annotations = map[string]string{} } - // If user sets this annotation, we need to reset it. nodeTyped.Annotations[labelSelectorAnnotationInternal] = selector.String() ret = append(ret, nodeTyped) @@ -564,7 +585,12 @@ func (n *NodeLister) Get(name string) (*corev1.Node, error) { klog.Errorf("Failed to get SingleClusterInformerManager for cluster(%s)", cluster.Name) continue } - node, err := sci.Lister(NodesGVR).Get(name) + sciLister, err := sci.Lister(NodesGVR) + if err != nil { + klog.Errorf("Failed to get lister for cluster(%s): %v", cluster.Name, err) + continue + } + node, err := sciLister.(listv1.NodeLister).Get(name) if err != nil { if !errors.IsNotFound(err) { klog.Errorf("Failed to get node from cluster(%s):%v", cluster.Name, err) @@ -577,16 +603,10 @@ func (n *NodeLister) Get(name string) (*corev1.Node, error) { return nil, errors.NewConflict(NodesGVR.GroupResource(), name, err) } - nodeTyped = &corev1.Node{} - err = helper.ConvertToTypedObject(node, nodeTyped) - if err != nil { - klog.Errorf("Failed to convert to typed object: %v", err) - return nil, err - } + nodeTyped = node if nodeTyped.Annotations == nil { nodeTyped.Annotations = map[string]string{} } - // If user sets this annotation, we need to remove it to avoid parsing wrong next. delete(nodeTyped.Annotations, labelSelectorAnnotationInternal) }