diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 86e30c52e..cd9043818 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -106,7 +106,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop Client: mgr.GetClient(), EventRecorder: mgr.GetEventRecorderFor(status.ControllerName), PredicateFunc: predicateFun, + InformerManager: informermanager.NewMultiClusterInformerManager(), + StopChan: stopChan, ClusterClientSetFunc: util.NewClusterClientSetForAgent, + ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency, } if err := clusterStatusController.SetupWithManager(mgr); err != nil { diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 6290727ce..6da1747ba 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -146,7 +146,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop Client: mgr.GetClient(), EventRecorder: mgr.GetEventRecorderFor(status.ControllerName), PredicateFunc: clusterPredicateFunc, + InformerManager: informermanager.NewMultiClusterInformerManager(), + StopChan: stopChan, ClusterClientSetFunc: util.NewClusterClientSet, + ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency, } if err := clusterStatusController.SetupWithManager(mgr); err != nil { diff --git a/pkg/controllers/status/cluster_status_controller.go b/pkg/controllers/status/cluster_status_controller.go index b318f911a..d43b2bab8 100644 --- a/pkg/controllers/status/cluster_status_controller.go +++ b/pkg/controllers/status/cluster_status_controller.go @@ -2,6 +2,7 @@ package status import ( "context" + "fmt" "net/http" "time" @@ -10,6 +11,10 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" @@ -21,6 +26,7 @@ import ( "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/informermanager" ) const ( @@ -38,12 +44,20 @@ const ( clusterStatusRetryTimeout = 2 * time.Second ) +var ( + nodeGVR = corev1.SchemeGroupVersion.WithResource("nodes") + podGVR = corev1.SchemeGroupVersion.WithResource("pods") +) + // ClusterStatusController is to sync status of Cluster. type ClusterStatusController struct { - client.Client // used to operate Cluster resources. - EventRecorder record.EventRecorder - PredicateFunc predicate.Predicate - ClusterClientSetFunc func(c *v1alpha1.Cluster, client client.Client) (*util.ClusterClient, error) + client.Client // used to operate Cluster resources. + EventRecorder record.EventRecorder + PredicateFunc predicate.Predicate + InformerManager informermanager.MultiClusterInformerManager + StopChan <-chan struct{} + ClusterClientSetFunc func(c *v1alpha1.Cluster, client client.Client) (*util.ClusterClient, error) + ClusterDynamicClientSetFunc func(c *v1alpha1.Cluster, client client.Client) (*util.DynamicClusterClient, error) // ClusterStatusUpdateFrequency is the frequency that controller computes cluster status. // If cluster lease feature is not enabled, it is also the frequency that controller posts cluster status @@ -59,8 +73,9 @@ func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerr cluster := &v1alpha1.Cluster{} if err := c.Client.Get(context.TODO(), req.NamespacedName, cluster); err != nil { - // The resource may no longer exist, in which case we stop processing. + // The resource may no longer exist, in which case we stop the informer. if errors.IsNotFound(err) { + // TODO(Garrybest): stop the informer and delete the cluster manager return controllerruntime.Result{}, nil } @@ -94,6 +109,13 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *v1alpha1.Cluster) ( return controllerruntime.Result{Requeue: true}, err } + // get or create listers for pods and nodes in member cluster + clusterListerManager, err := c.buildInformerForCluster(cluster) + if err != nil { + klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err) + return controllerruntime.Result{Requeue: true}, err + } + var currentClusterStatus = v1alpha1.ClusterStatus{} // get the health status of member cluster @@ -132,7 +154,7 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *v1alpha1.Cluster) ( } // get the summary of nodes status in the member cluster - nodeSummary, err := getNodeSummary(clusterClient) + nodeSummary, err := getNodeSummary(clusterListerManager) if err != nil { klog.Errorf("Failed to get summary of nodes status in the member cluster: %v, err is : %v", cluster.Name, err) return controllerruntime.Result{Requeue: true}, err @@ -162,6 +184,46 @@ func (c *ClusterStatusController) updateStatusIfNeeded(cluster *v1alpha1.Cluster return controllerruntime.Result{RequeueAfter: c.ClusterStatusUpdateFrequency.Duration}, nil } +// buildInformerForCluster builds informer manager for cluster if it doesn't exist, then constructs informers for node +// and pod and start it. If the informer manager exist, return it. +func (c *ClusterStatusController) buildInformerForCluster(cluster *v1alpha1.Cluster) (informermanager.SingleClusterInformerManager, error) { + singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) + if singleClusterInformerManager != nil { + return singleClusterInformerManager, nil + } + + clusterClient, err := c.ClusterDynamicClientSetFunc(cluster, c.Client) + if err != nil { + klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) + return nil, err + } + singleClusterInformerManager = c.InformerManager.ForCluster(clusterClient.ClusterName, clusterClient.DynamicClientSet, 0) + + gvrs := []schema.GroupVersionResource{ + nodeGVR, + podGVR, + } + + // create the informer for pods and nodes + for _, gvr := range gvrs { + singleClusterInformerManager.Lister(gvr) + } + + c.InformerManager.Start(cluster.Name, c.StopChan) + synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan) + if synced == nil { + klog.Errorf("The informer factory for cluster(%s) does not exist.", cluster.Name) + return nil, fmt.Errorf("informer factory for cluster(%s) does not exist", cluster.Name) + } + for _, gvr := range gvrs { + if !synced[gvr] { + klog.Errorf("Informer for %s hasn't synced.", gvr) + return nil, fmt.Errorf("informer for %s hasn't synced", gvr) + } + } + return singleClusterInformerManager, nil +} + func getClusterHealthStatus(clusterClient *util.ClusterClient) (online, healthy bool) { healthStatus, err := healthEndpointCheck(clusterClient.KubeClient, "/readyz") if err != nil && healthStatus == http.StatusNotFound { @@ -268,30 +330,41 @@ func getAPIEnablements(clusterClient *util.ClusterClient) ([]v1alpha1.APIEnablem return apiEnablements, nil } -func getNodeSummary(clusterClient *util.ClusterClient) (v1alpha1.NodeSummary, error) { +func getNodeSummary(informerManager informermanager.SingleClusterInformerManager) (v1alpha1.NodeSummary, error) { var nodeSummary = v1alpha1.NodeSummary{} - nodeList, err := clusterClient.KubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + nodeLister, podLister := informerManager.Lister(nodeGVR), informerManager.Lister(podGVR) + + nodeList, err := nodeLister.List(labels.Everything()) if err != nil { return nodeSummary, err } - totalNum := len(nodeList.Items) + nodes, err := convertObjectsToNodes(nodeList) + if err != nil { + return nodeSummary, err + } + + totalNum := len(nodes) readyNum := 0 - for _, node := range nodeList.Items { + for _, node := range nodes { if getReadyStatusForNode(node.Status) { readyNum++ } } + allocatable := getClusterAllocatable(nodes) - allocatable := getClusterAllocatable(nodeList) - - podList, err := clusterClient.KubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{}) + podList, err := podLister.List(labels.Everything()) if err != nil { return nodeSummary, err } - usedResource := getUsedResource(podList) + pods, err := convertObjectsToPods(podList) + if err != nil { + return nodeSummary, err + } + + usedResource := getUsedResource(pods) nodeSummary.TotalNum = totalNum nodeSummary.ReadyNum = readyNum @@ -301,6 +374,32 @@ func getNodeSummary(clusterClient *util.ClusterClient) (v1alpha1.NodeSummary, er return nodeSummary, nil } +func convertObjectsToNodes(nodeList []runtime.Object) ([]*corev1.Node, error) { + nodes := make([]*corev1.Node, 0, len(nodeList)) + for _, obj := range nodeList { + unstructObj := obj.(*unstructured.Unstructured) + node := &corev1.Node{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), node); err != nil { + return nil, fmt.Errorf("failed to convert unstructured to typed object: %v", err) + } + nodes = append(nodes, node) + } + return nodes, nil +} + +func convertObjectsToPods(podList []runtime.Object) ([]*corev1.Pod, error) { + pods := make([]*corev1.Pod, 0, len(podList)) + for _, obj := range podList { + unstructObj := obj.(*unstructured.Unstructured) + pod := &corev1.Pod{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), pod); err != nil { + return nil, fmt.Errorf("failed to convert unstructured to typed object: %v", err) + } + pods = append(pods, pod) + } + return pods, nil +} + func getReadyStatusForNode(nodeStatus corev1.NodeStatus) bool { for _, condition := range nodeStatus.Conditions { if condition.Type == "Ready" { @@ -312,9 +411,9 @@ func getReadyStatusForNode(nodeStatus corev1.NodeStatus) bool { return false } -func getClusterAllocatable(nodeList *corev1.NodeList) (allocatable corev1.ResourceList) { +func getClusterAllocatable(nodeList []*corev1.Node) (allocatable corev1.ResourceList) { allocatable = make(corev1.ResourceList) - for _, node := range nodeList.Items { + for _, node := range nodeList { for key, val := range node.Status.Allocatable { tmpCap, ok := allocatable[key] if ok { @@ -329,13 +428,13 @@ func getClusterAllocatable(nodeList *corev1.NodeList) (allocatable corev1.Resour return allocatable } -func getUsedResource(podList *corev1.PodList) corev1.ResourceList { +func getUsedResource(podList []*corev1.Pod) corev1.ResourceList { var requestCPU, requestMem int64 - for podIndex, pod := range podList.Items { + for _, pod := range podList { if pod.Status.Phase == "Running" { for _, c := range pod.Status.Conditions { if c.Type == "Ready" && c.Status == "True" { - podRes := addPodRequestResource(&podList.Items[podIndex]) + podRes := addPodRequestResource(pod) requestCPU += podRes.MilliCPU requestMem += podRes.Memory }