karmada/pkg/controllers/status/cluster_status_controller.go

630 lines
25 KiB
Go

package status
import (
"context"
"fmt"
"net/http"
"sort"
"strings"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
clientset "k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/component-helpers/apimachinery/lease"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/modeling"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager"
"github.com/karmada-io/karmada/pkg/util/helper"
)
const (
// ControllerName is the controller name that will be used when reporting events.
ControllerName = "cluster-status-controller"
clusterReady = "ClusterReady"
clusterHealthy = "cluster is healthy and ready to accept workloads"
clusterNotReady = "ClusterNotReady"
clusterUnhealthy = "cluster is reachable but health endpoint responded without ok"
clusterNotReachableReason = "ClusterNotReachable"
clusterNotReachableMsg = "cluster is not reachable"
statusCollectionFailed = "StatusCollectionFailed"
)
var (
nodeGVR = corev1.SchemeGroupVersion.WithResource("nodes")
podGVR = corev1.SchemeGroupVersion.WithResource("pods")
)
// ClusterStatusController is to sync status of Cluster.
type ClusterStatusController struct {
client.Client // used to operate Cluster resources.
KubeClient clientset.Interface
EventRecorder record.EventRecorder
PredicateFunc predicate.Predicate
TypedInformerManager typedmanager.MultiClusterInformerManager
GenericInformerManager genericmanager.MultiClusterInformerManager
StopChan <-chan struct{}
ClusterClientSetFunc func(string, client.Client, *util.ClientOption) (*util.ClusterClient, error)
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
// ClusterClientOption holds the attributes that should be injected to a Kubernetes client.
ClusterClientOption *util.ClientOption
// ClusterStatusUpdateFrequency is the frequency that controller computes and report cluster status.
ClusterStatusUpdateFrequency metav1.Duration
// ClusterLeaseDuration is a duration that candidates for a lease need to wait to force acquire it.
// This is measure against time of last observed lease RenewTime.
ClusterLeaseDuration metav1.Duration
// ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that
// how long the current holder of a lease has last updated the lease.
ClusterLeaseRenewIntervalFraction float64
// ClusterLeaseControllers stores context canceler function for each lease controller.
// Each lease controller is started with a separated context.
// key: cluster name of the lease controller servers for.
// value: context canceler function to stop the controller after cluster is un-registered.
ClusterLeaseControllers sync.Map
// ClusterSuccessThreshold is the duration of successes for the cluster to be considered healthy after recovery.
ClusterSuccessThreshold metav1.Duration
// ClusterFailureThreshold is the duration of failure for the cluster to be considered unhealthy.
ClusterFailureThreshold metav1.Duration
// clusterConditionCache stores the condition status of each cluster.
clusterConditionCache clusterConditionStore
ClusterCacheSyncTimeout metav1.Duration
RateLimiterOptions ratelimiterflag.Options
// EnableClusterResourceModeling indicates if enable cluster resource modeling.
// The resource modeling might be used by the scheduler to make scheduling decisions
// in scenario of dynamic replica assignment based on cluster free resources.
// Disable if it does not fit your cases for better performance.
EnableClusterResourceModeling bool
}
// Reconcile syncs status of the given member cluster.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will requeue the reconcile key after the duration.
func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Syncing cluster status: %s", req.NamespacedName.Name)
cluster := &clusterv1alpha1.Cluster{}
if err := c.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
// The resource may no longer exist, in which case we stop the informer.
if apierrors.IsNotFound(err) {
c.GenericInformerManager.Stop(req.NamespacedName.Name)
c.TypedInformerManager.Stop(req.NamespacedName.Name)
c.clusterConditionCache.delete(req.NamespacedName.Name)
// stop lease controller after the cluster is gone.
// only used for clusters in Pull mode because no need to set up lease syncing for Push clusters.
canceller, exists := c.ClusterLeaseControllers.LoadAndDelete(req.NamespacedName.Name)
if exists {
if cf, ok := canceller.(context.CancelFunc); ok {
cf()
}
}
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}
// start syncing status only when the finalizer is present on the given Cluster to
// avoid conflict with cluster controller.
if !controllerutil.ContainsFinalizer(cluster, util.ClusterControllerFinalizer) {
klog.V(2).Infof("Waiting finalizer present for member cluster: %s", cluster.Name)
return controllerruntime.Result{Requeue: true}, nil
}
return c.syncClusterStatus(cluster)
}
// SetupWithManager creates a controller and register to controller manager.
func (c *ClusterStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
c.clusterConditionCache = clusterConditionStore{
successThreshold: c.ClusterSuccessThreshold.Duration,
failureThreshold: c.ClusterFailureThreshold.Duration,
}
return controllerruntime.NewControllerManagedBy(mgr).
For(&clusterv1alpha1.Cluster{}, builder.WithPredicates(c.PredicateFunc)).
WithOptions(controller.Options{
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions),
}).Complete(c)
}
func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Cluster) (controllerruntime.Result, error) {
start := time.Now()
defer func() {
metrics.RecordClusterStatus(cluster)
metrics.RecordClusterSyncStatusDuration(cluster, start)
}()
currentClusterStatus := *cluster.Status.DeepCopy()
// create a ClusterClient for the given member cluster
clusterClient, err := c.ClusterClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to create a ClusterClient for the given member cluster: %v, err is : %v", cluster.Name, err)
return c.setStatusCollectionFailedCondition(cluster, currentClusterStatus, fmt.Sprintf("failed to create a ClusterClient: %v", err))
}
online, healthy := getClusterHealthStatus(clusterClient)
observedReadyCondition := generateReadyCondition(online, healthy)
readyCondition := c.clusterConditionCache.thresholdAdjustedReadyCondition(cluster, &observedReadyCondition)
// cluster is offline after retry timeout, update cluster status immediately and return.
if !online && readyCondition.Status != metav1.ConditionTrue {
klog.V(2).Infof("Cluster(%s) still offline after %s, ensuring offline is set.",
cluster.Name, c.ClusterFailureThreshold.Duration)
meta.SetStatusCondition(&currentClusterStatus.Conditions, *readyCondition)
return c.updateStatusIfNeeded(cluster, currentClusterStatus)
}
// skip collecting cluster status if not ready
if online && healthy && readyCondition.Status == metav1.ConditionTrue {
if cluster.Spec.SyncMode == clusterv1alpha1.Pull {
// init the lease controller for pull mode clusters
c.initLeaseController(cluster)
}
// The generic informer manager actually used by 'execution-controller' and 'work-status-controller'.
// TODO(@RainbowMango): We should follow who-use who takes the responsibility to initialize it.
// We should move this logic to both `execution-controller` and `work-status-controller`.
// After that the 'initializeGenericInformerManagerForCluster' function as well as 'c.GenericInformerManager'
// can be safely removed from current controller.
c.initializeGenericInformerManagerForCluster(clusterClient)
err := c.setCurrentClusterStatus(clusterClient, cluster, &currentClusterStatus)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
}
meta.SetStatusCondition(&currentClusterStatus.Conditions, *readyCondition)
return c.updateStatusIfNeeded(cluster, currentClusterStatus)
}
func (c *ClusterStatusController) setCurrentClusterStatus(clusterClient *util.ClusterClient, cluster *clusterv1alpha1.Cluster, currentClusterStatus *clusterv1alpha1.ClusterStatus) error {
clusterVersion, err := getKubernetesVersion(clusterClient)
if err != nil {
klog.Errorf("Failed to get Kubernetes version for Cluster %s. Error: %v.", cluster.GetName(), err)
}
currentClusterStatus.KubernetesVersion = clusterVersion
// get the list of APIs installed in the member cluster
apiEnables, err := getAPIEnablements(clusterClient)
if len(apiEnables) == 0 {
klog.Errorf("Failed to get any APIs installed in Cluster %s. Error: %v.", cluster.GetName(), err)
} else if err != nil {
klog.Warningf("Maybe get partial(%d) APIs installed in Cluster %s. Error: %v.", len(apiEnables), cluster.GetName(), err)
}
currentClusterStatus.APIEnablements = apiEnables
if c.EnableClusterResourceModeling {
// get or create informer for pods and nodes in member cluster
clusterInformerManager, err := c.buildInformerForCluster(clusterClient)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
// in large-scale clusters, the timeout may occur.
// if clusterInformerManager fails to be built, should be returned, otherwise, it may cause a nil pointer
return err
}
nodes, err := listNodes(clusterInformerManager)
if err != nil {
klog.Errorf("Failed to list nodes for Cluster %s. Error: %v.", cluster.GetName(), err)
}
pods, err := listPods(clusterInformerManager)
if err != nil {
klog.Errorf("Failed to list pods for Cluster %s. Error: %v.", cluster.GetName(), err)
}
currentClusterStatus.NodeSummary = getNodeSummary(nodes)
currentClusterStatus.ResourceSummary = getResourceSummary(nodes, pods)
if features.FeatureGate.Enabled(features.CustomizedClusterResourceModeling) {
currentClusterStatus.ResourceSummary.AllocatableModelings = getAllocatableModelings(cluster, nodes, pods)
}
}
return nil
}
func (c *ClusterStatusController) setStatusCollectionFailedCondition(cluster *clusterv1alpha1.Cluster, currentClusterStatus clusterv1alpha1.ClusterStatus, message string) (controllerruntime.Result, error) {
readyCondition := util.NewCondition(clusterv1alpha1.ClusterConditionReady, statusCollectionFailed, message, metav1.ConditionFalse)
meta.SetStatusCondition(&currentClusterStatus.Conditions, readyCondition)
return c.updateStatusIfNeeded(cluster, currentClusterStatus)
}
// updateStatusIfNeeded calls updateStatus only if the status of the member cluster is not the same as the old status
func (c *ClusterStatusController) updateStatusIfNeeded(cluster *clusterv1alpha1.Cluster, currentClusterStatus clusterv1alpha1.ClusterStatus) (controllerruntime.Result, error) {
if !equality.Semantic.DeepEqual(cluster.Status, currentClusterStatus) {
klog.V(4).Infof("Start to update cluster status: %s", cluster.Name)
err := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
cluster.Status = currentClusterStatus
updateErr := c.Status().Update(context.TODO(), cluster)
if updateErr == nil {
return nil
}
updated := &clusterv1alpha1.Cluster{}
if err = c.Get(context.TODO(), client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Name}, updated); err == nil {
// make a copy, so we don't mutate the shared cache
cluster = updated.DeepCopy()
} else {
klog.Errorf("Failed to get updated cluster %s: %v", cluster.Name, err)
}
return updateErr
})
if err != nil {
klog.Errorf("Failed to update health status of the member cluster: %v, err is : %v", cluster.Name, err)
return controllerruntime.Result{Requeue: true}, err
}
}
return controllerruntime.Result{RequeueAfter: c.ClusterStatusUpdateFrequency.Duration}, nil
}
func (c *ClusterStatusController) initializeGenericInformerManagerForCluster(clusterClient *util.ClusterClient) {
if c.GenericInformerManager.IsManagerExist(clusterClient.ClusterName) {
return
}
dynamicClient, err := c.ClusterDynamicClientSetFunc(clusterClient.ClusterName, c.Client)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterClient.ClusterName)
return
}
c.GenericInformerManager.ForCluster(clusterClient.ClusterName, dynamicClient.DynamicClientSet, 0)
}
// buildInformerForCluster builds informer manager for cluster if it doesn't exist, then constructs informers for node
// and pod and start it. If the informer manager exist, return it.
func (c *ClusterStatusController) buildInformerForCluster(clusterClient *util.ClusterClient) (typedmanager.SingleClusterInformerManager, error) {
singleClusterInformerManager := c.TypedInformerManager.GetSingleClusterManager(clusterClient.ClusterName)
if singleClusterInformerManager == nil {
singleClusterInformerManager = c.TypedInformerManager.ForCluster(clusterClient.ClusterName, clusterClient.KubeClient, 0)
}
gvrs := []schema.GroupVersionResource{nodeGVR, podGVR}
// create the informer for pods and nodes
allSynced := true
for _, gvr := range gvrs {
if !singleClusterInformerManager.IsInformerSynced(gvr) {
allSynced = false
if _, err := singleClusterInformerManager.Lister(gvr); err != nil {
klog.Errorf("Failed to get the lister for gvr %s: %v", gvr.String(), err)
}
}
}
if allSynced {
return singleClusterInformerManager, nil
}
c.TypedInformerManager.Start(clusterClient.ClusterName)
c.GenericInformerManager.Start(clusterClient.ClusterName)
if err := func() error {
synced := c.TypedInformerManager.WaitForCacheSyncWithTimeout(clusterClient.ClusterName, c.ClusterCacheSyncTimeout.Duration)
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", clusterClient.ClusterName)
}
for _, gvr := range gvrs {
if !synced[gvr] {
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}(); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", clusterClient.ClusterName, err)
c.TypedInformerManager.Stop(clusterClient.ClusterName)
return nil, err
}
return singleClusterInformerManager, nil
}
func (c *ClusterStatusController) initLeaseController(cluster *clusterv1alpha1.Cluster) {
// If lease controller has been registered, we skip this function.
if _, exists := c.ClusterLeaseControllers.Load(cluster.Name); exists {
return
}
// renewInterval is how often the lease renew time is updated.
renewInterval := time.Duration(float64(c.ClusterLeaseDuration.Nanoseconds()) * c.ClusterLeaseRenewIntervalFraction)
clusterLeaseController := lease.NewController(
clock.RealClock{},
c.KubeClient,
cluster.Name,
int32(c.ClusterLeaseDuration.Seconds()),
nil,
renewInterval,
cluster.Name,
util.NamespaceClusterLease,
util.SetLeaseOwnerFunc(c.Client, cluster.Name))
ctx, cancelFunc := context.WithCancel(context.TODO())
c.ClusterLeaseControllers.Store(cluster.Name, cancelFunc)
// start syncing lease
go func() {
klog.Infof("Starting syncing lease for cluster: %s", cluster.Name)
// lease controller will keep running until the stop channel is closed(context is canceled)
clusterLeaseController.Run(ctx.Done())
klog.Infof("Stop syncing lease for cluster: %s", cluster.Name)
c.ClusterLeaseControllers.Delete(cluster.Name) // ensure the cache is clean
}()
}
func getClusterHealthStatus(clusterClient *util.ClusterClient) (online, healthy bool) {
healthStatus, err := healthEndpointCheck(clusterClient.KubeClient, "/readyz")
if err != nil && healthStatus == http.StatusNotFound {
// do health check with healthz endpoint if the readyz endpoint is not installed in member cluster
healthStatus, err = healthEndpointCheck(clusterClient.KubeClient, "/healthz")
}
if err != nil {
klog.Errorf("Failed to do cluster health check for cluster %v, err is : %v ", clusterClient.ClusterName, err)
return false, false
}
if healthStatus != http.StatusOK {
klog.Infof("Member cluster %v isn't healthy", clusterClient.ClusterName)
return true, false
}
return true, true
}
func healthEndpointCheck(client *clientset.Clientset, path string) (int, error) {
var healthStatus int
resp := client.DiscoveryClient.RESTClient().Get().AbsPath(path).Do(context.TODO()).StatusCode(&healthStatus)
return healthStatus, resp.Error()
}
func generateReadyCondition(online, healthy bool) metav1.Condition {
if !online {
return util.NewCondition(clusterv1alpha1.ClusterConditionReady, clusterNotReachableReason, clusterNotReachableMsg, metav1.ConditionFalse)
}
if !healthy {
return util.NewCondition(clusterv1alpha1.ClusterConditionReady, clusterNotReady, clusterUnhealthy, metav1.ConditionFalse)
}
return util.NewCondition(clusterv1alpha1.ClusterConditionReady, clusterReady, clusterHealthy, metav1.ConditionTrue)
}
func getKubernetesVersion(clusterClient *util.ClusterClient) (string, error) {
clusterVersion, err := clusterClient.KubeClient.Discovery().ServerVersion()
if err != nil {
return "", err
}
return clusterVersion.GitVersion, nil
}
// getAPIEnablements returns the list of API enablement(supported groups and resources).
// The returned lists might be non-nil with partial results even in the case of non-nil error.
func getAPIEnablements(clusterClient *util.ClusterClient) ([]clusterv1alpha1.APIEnablement, error) {
_, apiResourceList, err := clusterClient.KubeClient.Discovery().ServerGroupsAndResources()
if len(apiResourceList) == 0 {
return nil, err
}
var apiEnablements []clusterv1alpha1.APIEnablement
for _, list := range apiResourceList {
var apiResources []clusterv1alpha1.APIResource
for _, resource := range list.APIResources {
// skip subresources such as "/status", "/scale" and etc because these are not real APIResources that we are caring about.
if strings.Contains(resource.Name, "/") {
continue
}
apiResource := clusterv1alpha1.APIResource{
Name: resource.Name,
Kind: resource.Kind,
}
apiResources = append(apiResources, apiResource)
}
sort.SliceStable(apiResources, func(i, j int) bool {
return apiResources[i].Name < apiResources[j].Name
})
apiEnablements = append(apiEnablements, clusterv1alpha1.APIEnablement{GroupVersion: list.GroupVersion, Resources: apiResources})
}
sort.SliceStable(apiEnablements, func(i, j int) bool {
return apiEnablements[i].GroupVersion < apiEnablements[j].GroupVersion
})
return apiEnablements, err
}
// listPods returns the Pod list from the informerManager cache.
func listPods(informerManager typedmanager.SingleClusterInformerManager) ([]*corev1.Pod, error) {
podInterface, err := informerManager.Lister(podGVR)
if err != nil {
return nil, err
}
podLister, ok := podInterface.(v1.PodLister)
if !ok {
return nil, fmt.Errorf("failed to convert interface to PodLister")
}
return podLister.List(labels.Everything())
}
// listNodes returns the Node list from the informerManager cache.
func listNodes(informerManager typedmanager.SingleClusterInformerManager) ([]*corev1.Node, error) {
nodeInterface, err := informerManager.Lister(nodeGVR)
if err != nil {
return nil, err
}
nodeLister, ok := nodeInterface.(v1.NodeLister)
if !ok {
return nil, fmt.Errorf("failed to convert interface to NodeLister")
}
return nodeLister.List(labels.Everything())
}
func getNodeSummary(nodes []*corev1.Node) *clusterv1alpha1.NodeSummary {
totalNum := len(nodes)
readyNum := 0
for _, node := range nodes {
if helper.NodeReady(node) {
readyNum++
}
}
nodeSummary := &clusterv1alpha1.NodeSummary{}
nodeSummary.TotalNum = int32(totalNum)
nodeSummary.ReadyNum = int32(readyNum)
return nodeSummary
}
func getResourceSummary(nodes []*corev1.Node, pods []*corev1.Pod) *clusterv1alpha1.ResourceSummary {
allocatable := getClusterAllocatable(nodes)
allocating := getAllocatingResource(pods)
allocated := getAllocatedResource(pods)
resourceSummary := &clusterv1alpha1.ResourceSummary{}
resourceSummary.Allocatable = allocatable
resourceSummary.Allocating = allocating
resourceSummary.Allocated = allocated
return resourceSummary
}
func getClusterAllocatable(nodeList []*corev1.Node) (allocatable corev1.ResourceList) {
allocatable = make(corev1.ResourceList)
for _, node := range nodeList {
for key, val := range node.Status.Allocatable {
tmpCap, ok := allocatable[key]
if ok {
tmpCap.Add(val)
} else {
tmpCap = val
}
allocatable[key] = tmpCap
}
}
return allocatable
}
func getAllocatingResource(podList []*corev1.Pod) corev1.ResourceList {
allocating := util.EmptyResource()
podNum := int64(0)
for _, pod := range podList {
if len(pod.Spec.NodeName) == 0 {
allocating.AddPodRequest(&pod.Spec)
podNum++
}
}
allocating.AddResourcePods(podNum)
return allocating.ResourceList()
}
func getAllocatedResource(podList []*corev1.Pod) corev1.ResourceList {
allocated := util.EmptyResource()
podNum := int64(0)
for _, pod := range podList {
// When the phase of a pod is Succeeded or Failed, kube-scheduler would not consider its resource occupation.
if len(pod.Spec.NodeName) != 0 && pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed {
allocated.AddPodRequest(&pod.Spec)
podNum++
}
}
allocated.AddResourcePods(podNum)
return allocated.ResourceList()
}
func getNodeAvailable(allocatable corev1.ResourceList, podResources *util.Resource) corev1.ResourceList {
if podResources == nil {
return allocatable
}
allocatedResourceList := podResources.ResourceList()
if allocatedResourceList == nil {
return allocatable
}
allowedPodNumber := allocatable.Pods().Value() - allocatedResourceList.Pods().Value()
// When too many pods have been created, scheduling will fail so that the allocating pods number may be huge.
// If allowedPodNumber is less than or equal to 0, we don't allow more pods to be created.
if allowedPodNumber <= 0 {
klog.Warningf("The number of schedulable Pods on the node is less than or equal to 0, we won't add the node to cluster resource models.")
return nil
}
for allocatedName, allocatedQuantity := range allocatedResourceList {
if allocatableQuantity, ok := allocatable[allocatedName]; ok {
allocatableQuantity.Sub(allocatedQuantity)
allocatable[allocatedName] = allocatableQuantity
}
}
return allocatable
}
func getAllocatableModelings(cluster *clusterv1alpha1.Cluster, nodes []*corev1.Node, pods []*corev1.Pod) []clusterv1alpha1.AllocatableModeling {
if len(cluster.Spec.ResourceModels) == 0 {
return nil
}
modelingSummary, err := modeling.InitSummary(cluster.Spec.ResourceModels)
if err != nil {
klog.Errorf("Failed to init cluster summary from cluster resource models for Cluster %s. Error: %v.", cluster.GetName(), err)
return nil
}
nodePodResourcesMap := make(map[string]*util.Resource)
for _, pod := range pods {
// When the phase of a pod is Succeeded or Failed, kube-scheduler would not consider its resource occupation.
if len(pod.Spec.NodeName) != 0 && pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed {
if nodePodResourcesMap[pod.Spec.NodeName] == nil {
nodePodResourcesMap[pod.Spec.NodeName] = util.EmptyResource()
}
nodePodResourcesMap[pod.Spec.NodeName].AddPodRequest(&pod.Spec)
nodePodResourcesMap[pod.Spec.NodeName].AddResourcePods(1)
}
}
for _, node := range nodes {
nodeAvailable := getNodeAvailable(node.Status.Allocatable.DeepCopy(), nodePodResourcesMap[node.Name])
if nodeAvailable == nil {
break
}
modelingSummary.AddToResourceSummary(modeling.NewClusterResourceNode(nodeAvailable))
}
m := make([]clusterv1alpha1.AllocatableModeling, len(modelingSummary))
for index, resourceModel := range modelingSummary {
m[index].Grade = cluster.Spec.ResourceModels[index].Grade
m[index].Count = resourceModel.Quantity
}
return m
}