Merge pull request #365 from Garrybest/cluster-status-controller

using cached lister to get node summary
This commit is contained in:
karmada-bot 2021-06-03 14:51:41 +08:00 committed by GitHub
commit 2233ab438c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 124 additions and 19 deletions

View File

@ -106,7 +106,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
PredicateFunc: predicateFun,
InformerManager: informermanager.NewMultiClusterInformerManager(),
StopChan: stopChan,
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
}
if err := clusterStatusController.SetupWithManager(mgr); err != nil {

View File

@ -146,7 +146,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
PredicateFunc: clusterPredicateFunc,
InformerManager: informermanager.NewMultiClusterInformerManager(),
StopChan: stopChan,
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
}
if err := clusterStatusController.SetupWithManager(mgr); err != nil {

View File

@ -2,6 +2,7 @@ package status
import (
"context"
"fmt"
"net/http"
"time"
@ -10,6 +11,10 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
@ -21,6 +26,7 @@ import (
"github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/informermanager"
)
const (
@ -38,12 +44,20 @@ const (
clusterStatusRetryTimeout = 2 * time.Second
)
var (
nodeGVR = corev1.SchemeGroupVersion.WithResource("nodes")
podGVR = corev1.SchemeGroupVersion.WithResource("pods")
)
// ClusterStatusController is to sync status of Cluster.
type ClusterStatusController struct {
client.Client // used to operate Cluster resources.
EventRecorder record.EventRecorder
PredicateFunc predicate.Predicate
ClusterClientSetFunc func(c *v1alpha1.Cluster, client client.Client) (*util.ClusterClient, error)
client.Client // used to operate Cluster resources.
EventRecorder record.EventRecorder
PredicateFunc predicate.Predicate
InformerManager informermanager.MultiClusterInformerManager
StopChan <-chan struct{}
ClusterClientSetFunc func(c *v1alpha1.Cluster, client client.Client) (*util.ClusterClient, error)
ClusterDynamicClientSetFunc func(c *v1alpha1.Cluster, client client.Client) (*util.DynamicClusterClient, error)
// ClusterStatusUpdateFrequency is the frequency that controller computes cluster status.
// If cluster lease feature is not enabled, it is also the frequency that controller posts cluster status
@ -59,8 +73,9 @@ func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerr
cluster := &v1alpha1.Cluster{}
if err := c.Client.Get(context.TODO(), req.NamespacedName, cluster); err != nil {
// The resource may no longer exist, in which case we stop processing.
// The resource may no longer exist, in which case we stop the informer.
if errors.IsNotFound(err) {
// TODO(Garrybest): stop the informer and delete the cluster manager
return controllerruntime.Result{}, nil
}
@ -94,6 +109,13 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *v1alpha1.Cluster) (
return controllerruntime.Result{Requeue: true}, err
}
// get or create listers for pods and nodes in member cluster
clusterListerManager, err := c.buildInformerForCluster(cluster)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}
var currentClusterStatus = v1alpha1.ClusterStatus{}
// get the health status of member cluster
@ -132,7 +154,7 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *v1alpha1.Cluster) (
}
// get the summary of nodes status in the member cluster
nodeSummary, err := getNodeSummary(clusterClient)
nodeSummary, err := getNodeSummary(clusterListerManager)
if err != nil {
klog.Errorf("Failed to get summary of nodes status in the member cluster: %v, err is : %v", cluster.Name, err)
return controllerruntime.Result{Requeue: true}, err
@ -162,6 +184,46 @@ func (c *ClusterStatusController) updateStatusIfNeeded(cluster *v1alpha1.Cluster
return controllerruntime.Result{RequeueAfter: c.ClusterStatusUpdateFrequency.Duration}, nil
}
// buildInformerForCluster builds informer manager for cluster if it doesn't exist, then constructs informers for node
// and pod and start it. If the informer manager exist, return it.
func (c *ClusterStatusController) buildInformerForCluster(cluster *v1alpha1.Cluster) (informermanager.SingleClusterInformerManager, error) {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager != nil {
return singleClusterInformerManager, nil
}
clusterClient, err := c.ClusterDynamicClientSetFunc(cluster, c.Client)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return nil, err
}
singleClusterInformerManager = c.InformerManager.ForCluster(clusterClient.ClusterName, clusterClient.DynamicClientSet, 0)
gvrs := []schema.GroupVersionResource{
nodeGVR,
podGVR,
}
// create the informer for pods and nodes
for _, gvr := range gvrs {
singleClusterInformerManager.Lister(gvr)
}
c.InformerManager.Start(cluster.Name, c.StopChan)
synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan)
if synced == nil {
klog.Errorf("The informer factory for cluster(%s) does not exist.", cluster.Name)
return nil, fmt.Errorf("informer factory for cluster(%s) does not exist", cluster.Name)
}
for _, gvr := range gvrs {
if !synced[gvr] {
klog.Errorf("Informer for %s hasn't synced.", gvr)
return nil, fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return singleClusterInformerManager, nil
}
func getClusterHealthStatus(clusterClient *util.ClusterClient) (online, healthy bool) {
healthStatus, err := healthEndpointCheck(clusterClient.KubeClient, "/readyz")
if err != nil && healthStatus == http.StatusNotFound {
@ -268,30 +330,41 @@ func getAPIEnablements(clusterClient *util.ClusterClient) ([]v1alpha1.APIEnablem
return apiEnablements, nil
}
func getNodeSummary(clusterClient *util.ClusterClient) (v1alpha1.NodeSummary, error) {
func getNodeSummary(informerManager informermanager.SingleClusterInformerManager) (v1alpha1.NodeSummary, error) {
var nodeSummary = v1alpha1.NodeSummary{}
nodeList, err := clusterClient.KubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
nodeLister, podLister := informerManager.Lister(nodeGVR), informerManager.Lister(podGVR)
nodeList, err := nodeLister.List(labels.Everything())
if err != nil {
return nodeSummary, err
}
totalNum := len(nodeList.Items)
nodes, err := convertObjectsToNodes(nodeList)
if err != nil {
return nodeSummary, err
}
totalNum := len(nodes)
readyNum := 0
for _, node := range nodeList.Items {
for _, node := range nodes {
if getReadyStatusForNode(node.Status) {
readyNum++
}
}
allocatable := getClusterAllocatable(nodes)
allocatable := getClusterAllocatable(nodeList)
podList, err := clusterClient.KubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
podList, err := podLister.List(labels.Everything())
if err != nil {
return nodeSummary, err
}
usedResource := getUsedResource(podList)
pods, err := convertObjectsToPods(podList)
if err != nil {
return nodeSummary, err
}
usedResource := getUsedResource(pods)
nodeSummary.TotalNum = totalNum
nodeSummary.ReadyNum = readyNum
@ -301,6 +374,32 @@ func getNodeSummary(clusterClient *util.ClusterClient) (v1alpha1.NodeSummary, er
return nodeSummary, nil
}
func convertObjectsToNodes(nodeList []runtime.Object) ([]*corev1.Node, error) {
nodes := make([]*corev1.Node, 0, len(nodeList))
for _, obj := range nodeList {
unstructObj := obj.(*unstructured.Unstructured)
node := &corev1.Node{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), node); err != nil {
return nil, fmt.Errorf("failed to convert unstructured to typed object: %v", err)
}
nodes = append(nodes, node)
}
return nodes, nil
}
func convertObjectsToPods(podList []runtime.Object) ([]*corev1.Pod, error) {
pods := make([]*corev1.Pod, 0, len(podList))
for _, obj := range podList {
unstructObj := obj.(*unstructured.Unstructured)
pod := &corev1.Pod{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), pod); err != nil {
return nil, fmt.Errorf("failed to convert unstructured to typed object: %v", err)
}
pods = append(pods, pod)
}
return pods, nil
}
func getReadyStatusForNode(nodeStatus corev1.NodeStatus) bool {
for _, condition := range nodeStatus.Conditions {
if condition.Type == "Ready" {
@ -312,9 +411,9 @@ func getReadyStatusForNode(nodeStatus corev1.NodeStatus) bool {
return false
}
func getClusterAllocatable(nodeList *corev1.NodeList) (allocatable corev1.ResourceList) {
func getClusterAllocatable(nodeList []*corev1.Node) (allocatable corev1.ResourceList) {
allocatable = make(corev1.ResourceList)
for _, node := range nodeList.Items {
for _, node := range nodeList {
for key, val := range node.Status.Allocatable {
tmpCap, ok := allocatable[key]
if ok {
@ -329,13 +428,13 @@ func getClusterAllocatable(nodeList *corev1.NodeList) (allocatable corev1.Resour
return allocatable
}
func getUsedResource(podList *corev1.PodList) corev1.ResourceList {
func getUsedResource(podList []*corev1.Pod) corev1.ResourceList {
var requestCPU, requestMem int64
for podIndex, pod := range podList.Items {
for _, pod := range podList {
if pod.Status.Phase == "Running" {
for _, c := range pod.Status.Conditions {
if c.Type == "Ready" && c.Status == "True" {
podRes := addPodRequestResource(&podList.Items[podIndex])
podRes := addPodRequestResource(pod)
requestCPU += podRes.MilliCPU
requestMem += podRes.Memory
}