Merge pull request #2402 from Poor12/2367

Make changes to cluster-status-controller to adopt cluster resource models
This commit is contained in:
karmada-bot 2022-08-25 10:18:52 +08:00 committed by GitHub
commit bceea1635c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 108 additions and 35 deletions

View File

@ -31,6 +31,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/modeling"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
@ -182,21 +184,6 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
c.initLeaseController(cluster)
}
clusterVersion, err := getKubernetesVersion(clusterClient)
if err != nil {
klog.Errorf("Failed to get Kubernetes version for Cluster %s. Error: %v.", cluster.GetName(), err)
}
currentClusterStatus.KubernetesVersion = clusterVersion
// get the list of APIs installed in the member cluster
apiEnables, err := getAPIEnablements(clusterClient)
if len(apiEnables) == 0 {
klog.Errorf("Failed to get any APIs installed in Cluster %s. Error: %v.", cluster.GetName(), err)
} else if err != nil {
klog.Warningf("Maybe get partial(%d) APIs installed in Cluster %s. Error: %v.", len(apiEnables), cluster.GetName(), err)
}
currentClusterStatus.APIEnablements = apiEnables
// The generic informer manager actually used by 'execution-controller' and 'work-status-controller'.
// TODO(@RainbowMango): We should follow who-use who takes the responsibility to initialize it.
// We should move this logic to both `execution-controller` and `work-status-controller`.
@ -204,26 +191,9 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
// can be safely removed from current controller.
c.initializeGenericInformerManagerForCluster(clusterClient)
if c.EnableClusterResourceModeling {
// get or create informer for pods and nodes in member cluster
clusterInformerManager, err := c.buildInformerForCluster(clusterClient)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
// in large-scale clusters, the timeout may occur.
// if clusterInformerManager fails to be built, should be returned, otherwise, it may cause a nil pointer
return controllerruntime.Result{Requeue: true}, err
}
nodes, err := listNodes(clusterInformerManager)
if err != nil {
klog.Errorf("Failed to list nodes for Cluster %s. Error: %v.", cluster.GetName(), err)
}
pods, err := listPods(clusterInformerManager)
if err != nil {
klog.Errorf("Failed to list pods for Cluster %s. Error: %v.", cluster.GetName(), err)
}
currentClusterStatus.NodeSummary = getNodeSummary(nodes)
currentClusterStatus.ResourceSummary = getResourceSummary(nodes, pods)
err := c.setCurrentClusterStatus(clusterClient, cluster, &currentClusterStatus)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
}
@ -233,6 +203,50 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
return c.updateStatusIfNeeded(cluster, currentClusterStatus)
}
func (c *ClusterStatusController) setCurrentClusterStatus(clusterClient *util.ClusterClient, cluster *clusterv1alpha1.Cluster, currentClusterStatus *clusterv1alpha1.ClusterStatus) error {
clusterVersion, err := getKubernetesVersion(clusterClient)
if err != nil {
klog.Errorf("Failed to get Kubernetes version for Cluster %s. Error: %v.", cluster.GetName(), err)
}
currentClusterStatus.KubernetesVersion = clusterVersion
// get the list of APIs installed in the member cluster
apiEnables, err := getAPIEnablements(clusterClient)
if len(apiEnables) == 0 {
klog.Errorf("Failed to get any APIs installed in Cluster %s. Error: %v.", cluster.GetName(), err)
} else if err != nil {
klog.Warningf("Maybe get partial(%d) APIs installed in Cluster %s. Error: %v.", len(apiEnables), cluster.GetName(), err)
}
currentClusterStatus.APIEnablements = apiEnables
if c.EnableClusterResourceModeling {
// get or create informer for pods and nodes in member cluster
clusterInformerManager, err := c.buildInformerForCluster(clusterClient)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
// in large-scale clusters, the timeout may occur.
// if clusterInformerManager fails to be built, should be returned, otherwise, it may cause a nil pointer
return err
}
nodes, err := listNodes(clusterInformerManager)
if err != nil {
klog.Errorf("Failed to list nodes for Cluster %s. Error: %v.", cluster.GetName(), err)
}
pods, err := listPods(clusterInformerManager)
if err != nil {
klog.Errorf("Failed to list pods for Cluster %s. Error: %v.", cluster.GetName(), err)
}
currentClusterStatus.NodeSummary = getNodeSummary(nodes)
currentClusterStatus.ResourceSummary = getResourceSummary(nodes, pods)
if features.FeatureGate.Enabled(features.CustomizedClusterResourceModeling) {
currentClusterStatus.ResourceSummary.AllocatableModelings = getAllocatableModelings(cluster, nodes, pods)
}
}
return nil
}
func (c *ClusterStatusController) setStatusCollectionFailedCondition(cluster *clusterv1alpha1.Cluster, currentClusterStatus clusterv1alpha1.ClusterStatus, message string) (controllerruntime.Result, error) {
readyCondition := util.NewCondition(clusterv1alpha1.ClusterConditionReady, statusCollectionFailed, message, metav1.ConditionFalse)
setTransitionTime(cluster.Status.Conditions, &readyCondition)
@ -554,3 +568,62 @@ func getAllocatedResource(podList []*corev1.Pod) corev1.ResourceList {
allocated.AddResourcePods(podNum)
return allocated.ResourceList()
}
func getNodeAvailable(allocatable corev1.ResourceList, podResources *util.Resource) corev1.ResourceList {
allocatedResourceList := podResources.ResourceList()
allowedPodNumber := allocatable.Pods().Value() - allocatedResourceList.Pods().Value()
// When too many pods have been created, scheduling will fail so that the allocating pods number may be huge.
// If allowedPodNumber is less than or equal to 0, we don't allow more pods to be created.
if allowedPodNumber <= 0 {
klog.Warningf("The number of schedulable Pods on the node is less than or equal to 0, we won't add the node to cluster resource models.")
return nil
}
for allocatedName, allocatedQuantity := range allocatedResourceList {
if allocatableQuantity, ok := allocatable[allocatedName]; ok {
allocatableQuantity.Sub(allocatedQuantity)
allocatable[allocatedName] = allocatableQuantity
}
}
return allocatable
}
func getAllocatableModelings(cluster *clusterv1alpha1.Cluster, nodes []*corev1.Node, pods []*corev1.Pod) []clusterv1alpha1.AllocatableModeling {
if len(cluster.Spec.ResourceModels) == 0 {
return nil
}
modelingSummary, err := modeling.InitSummary(cluster.Spec.ResourceModels)
if err != nil {
klog.Errorf("Failed to init cluster summary from cluster resource models for Cluster %s. Error: %v.", cluster.GetName(), err)
return nil
}
nodePodResourcesMap := make(map[string]*util.Resource)
for _, pod := range pods {
// When the phase of a pod is Succeeded or Failed, kube-scheduler would not consider its resource occupation.
if len(pod.Spec.NodeName) != 0 && pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed {
if nodePodResourcesMap[pod.Spec.NodeName] == nil {
nodePodResourcesMap[pod.Spec.NodeName] = util.EmptyResource()
}
nodePodResourcesMap[pod.Spec.NodeName].AddPodRequest(&pod.Spec)
nodePodResourcesMap[pod.Spec.NodeName].AddResourcePods(1)
}
}
for _, node := range nodes {
nodeAvailable := getNodeAvailable(node.Status.Allocatable.DeepCopy(), nodePodResourcesMap[node.Name])
if nodeAvailable == nil {
break
}
modelingSummary.AddToResourceSummary(modeling.NewClusterResourceNode(nodeAvailable))
}
m := make([]clusterv1alpha1.AllocatableModeling, len(modelingSummary))
for index, resourceModel := range modelingSummary {
m[index].Grade = cluster.Spec.ResourceModels[index].Grade
m[index].Count = resourceModel.Quantity
}
return m
}