From fd6b360e28acab03a6ba782350416eefc672dd27 Mon Sep 17 00:00:00 2001 From: Piotr Szczesniak Date: Fri, 20 May 2016 18:34:26 +0200 Subject: [PATCH] Added protection againts adding unnecessary nodes --- cluster-autoscaler/cluster_autoscaler.go | 37 +++++++++++++++++----- cluster-autoscaler/simulator/predicates.go | 12 ++++++- cluster-autoscaler/utils.go | 28 ++++++++++++++++ 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/cluster-autoscaler/cluster_autoscaler.go b/cluster-autoscaler/cluster_autoscaler.go index 4d5c6a5cae..822b6265d9 100644 --- a/cluster-autoscaler/cluster_autoscaler.go +++ b/cluster-autoscaler/cluster_autoscaler.go @@ -32,8 +32,11 @@ import ( ) var ( - migConfigFlag config.MigConfigFlag - kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default") + migConfigFlag config.MigConfigFlag + kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default") + verifyUnschedulablePods = flag.Bool("verify-unschedulable-pods", true, + "If enabled CA will ensure that each pod marked by Scheduler as unschedulable actually can't be scheduled on any node."+ + "This prevents from adding unnecessary nodes in situation when CA and Scheduler have different configuration.") scaleDownEnabled = flag.Bool("experimental-scale-down-enabled", false, "Should CA scale down the cluster") scaleDownDelay = flag.Duration("scale-down-delay", 10*time.Minute, "Duration from the last scale up to the time when CA starts to check scale down options") @@ -111,12 +114,36 @@ func main() { continue } + allScheduled, err := scheduledPodLister.List() + if err != nil { + glog.Errorf("Failed to list scheduled pods: %v", err) + continue + } + // We need to reset all pods that have been marked as unschedulable not after // the newest node became available for the scheduler. allNodesAvailableTime := GetAllNodesAvailableTime(nodes) podsToReset, unschedulablePodsToHelp := SlicePodsByPodScheduledTime(allUnschedulablePods, allNodesAvailableTime) ResetPodScheduledCondition(kubeClient, podsToReset) + // We need to check whether pods marked as unschedulable are actually unschedulable. + // This should prevent from adding unnecessary nodes. Example of such situation: + // - CA and Scheduler has slightly different configuration + // - Scheduler can't schedule a pod and marks it as unschedulable + // - CA added a node which should help the pod + // - Scheduler doesn't schedule the pod on the new node + // because according to it logic it doesn't fit there + // - CA see the pod is still unschedulable, so it adds another node to help it + // + // With the check enabled the last point won't happen because CA will ignore a pod + // which is supposed to schedule on an existing node. + // + // Without below check cluster might be unnecessary scaled up to the max allowed size + // in the describe situation. + if *verifyUnschedulablePods { + unschedulablePodsToHelp = FilterOutSchedulable(unschedulablePodsToHelp, nodes, allScheduled, predicateChecker) + } + if len(unschedulablePodsToHelp) == 0 { glog.V(1).Info("No unschedulable pods") } else { @@ -134,16 +161,10 @@ func main() { } if *scaleDownEnabled { - // In dry run only utilization is updated calculateUtilizationOnly := lastScaleUpTime.Add(*scaleDownDelay).After(time.Now()) || lastScaleDownFailedTrial.Add(*scaleDownTrialFrequency).After(time.Now()) - allScheduled, err := scheduledPodLister.List() - if err != nil { - glog.Errorf("Failed to list scheduled pods: %v", err) - continue - } underutilizedNodes = CalculateUnderutilizedNodes( nodes, underutilizedNodes, diff --git a/cluster-autoscaler/simulator/predicates.go b/cluster-autoscaler/simulator/predicates.go index 8116778f09..00c989932f 100644 --- a/cluster-autoscaler/simulator/predicates.go +++ b/cluster-autoscaler/simulator/predicates.go @@ -34,7 +34,17 @@ func NewPredicateChecker() *PredicateChecker { return &PredicateChecker{} } -// CheckPredicates Checks if the given pod can be placed on the given node. +// FitsAny checks if the given pod can be place on any of the given nodes. +func (p *PredicateChecker) FitsAny(pod *kube_api.Pod, nodeInfos map[string]*schedulercache.NodeInfo) (string, error) { + for name, nodeInfo := range nodeInfos { + if err := p.CheckPredicates(pod, nodeInfo); err == nil { + return name, nil + } + } + return "", fmt.Errorf("cannot put pod %s on any node", pod.Name) +} + +// CheckPredicates checks if the given pod can be placed on the given node. func (p *PredicateChecker) CheckPredicates(pod *kube_api.Pod, nodeInfo *schedulercache.NodeInfo) error { // TODO(fgrzadkowski): Use full list of predicates. match, err := predicates.GeneralPredicates(pod, nodeInfo) diff --git a/cluster-autoscaler/utils.go b/cluster-autoscaler/utils.go index 311a6badd2..0a98bb5c8a 100644 --- a/cluster-autoscaler/utils.go +++ b/cluster-autoscaler/utils.go @@ -187,6 +187,34 @@ func resetPodScheduledConditionForPod(kubeClient *kube_client.Client, pod *kube_ return fmt.Errorf("Expected condition PodScheduled") } +// FilterOutSchedulable checks whether pods from marked as unschedulable +// by Scheduler actually can't be scheduled on any node and filter out the ones that can. +func FilterOutSchedulable(unschedulableCandidates []*kube_api.Pod, nodes []*kube_api.Node, allPods []*kube_api.Pod, predicateChecker *simulator.PredicateChecker) []*kube_api.Pod { + unschedulablePods := []*kube_api.Pod{} + nodeNameToNodeInfo := createNodeNameToInfoMap(allPods, nodes) + + for _, pod := range unschedulableCandidates { + if nodeName, err := predicateChecker.FitsAny(pod, nodeNameToNodeInfo); err == nil { + glog.Warningf("Pod %s marked as unschedulable can be scheduled on %s. Ignoring in scale up.", pod.Name, nodeName) + } else { + unschedulablePods = append(unschedulablePods, pod) + } + } + + return unschedulablePods +} + +// TODO: move this function to scheduler utils. +func createNodeNameToInfoMap(pods []*kube_api.Pod, nodes []*kube_api.Node) map[string]*schedulercache.NodeInfo { + nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods) + for _, node := range nodes { + if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found { + nodeInfo.SetNode(node) + } + } + return nodeNameToNodeInfo +} + // CheckMigsAndNodes checks if all migs have all required nodes. func CheckMigsAndNodes(nodes []*kube_api.Node, gceManager *gce.GceManager) error { migCount := make(map[string]int)