Merge pull request https://github.com/kubernetes/contrib/pull/1026 from piosz/pending-check

[Cluster autoscaler] Added protection againts adding unnecessary nodes
This commit is contained in:
Marcin Wielgus 2016-05-23 13:47:38 +02:00
commit 64a470a989
3 changed files with 68 additions and 9 deletions

View File

@ -34,6 +34,9 @@ import (
var ( var (
migConfigFlag config.MigConfigFlag migConfigFlag config.MigConfigFlag
kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default") kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default")
verifyUnschedulablePods = flag.Bool("verify-unschedulable-pods", true,
"If enabled CA will ensure that each pod marked by Scheduler as unschedulable actually can't be scheduled on any node."+
"This prevents from adding unnecessary nodes in situation when CA and Scheduler have different configuration.")
scaleDownEnabled = flag.Bool("experimental-scale-down-enabled", false, "Should CA scale down the cluster") scaleDownEnabled = flag.Bool("experimental-scale-down-enabled", false, "Should CA scale down the cluster")
scaleDownDelay = flag.Duration("scale-down-delay", 10*time.Minute, scaleDownDelay = flag.Duration("scale-down-delay", 10*time.Minute,
"Duration from the last scale up to the time when CA starts to check scale down options") "Duration from the last scale up to the time when CA starts to check scale down options")
@ -111,12 +114,36 @@ func main() {
continue continue
} }
allScheduled, err := scheduledPodLister.List()
if err != nil {
glog.Errorf("Failed to list scheduled pods: %v", err)
continue
}
// We need to reset all pods that have been marked as unschedulable not after // We need to reset all pods that have been marked as unschedulable not after
// the newest node became available for the scheduler. // the newest node became available for the scheduler.
allNodesAvailableTime := GetAllNodesAvailableTime(nodes) allNodesAvailableTime := GetAllNodesAvailableTime(nodes)
podsToReset, unschedulablePodsToHelp := SlicePodsByPodScheduledTime(allUnschedulablePods, allNodesAvailableTime) podsToReset, unschedulablePodsToHelp := SlicePodsByPodScheduledTime(allUnschedulablePods, allNodesAvailableTime)
ResetPodScheduledCondition(kubeClient, podsToReset) ResetPodScheduledCondition(kubeClient, podsToReset)
// We need to check whether pods marked as unschedulable are actually unschedulable.
// This should prevent from adding unnecessary nodes. Example of such situation:
// - CA and Scheduler has slightly different configuration
// - Scheduler can't schedule a pod and marks it as unschedulable
// - CA added a node which should help the pod
// - Scheduler doesn't schedule the pod on the new node
// because according to it logic it doesn't fit there
// - CA see the pod is still unschedulable, so it adds another node to help it
//
// With the check enabled the last point won't happen because CA will ignore a pod
// which is supposed to schedule on an existing node.
//
// Without below check cluster might be unnecessary scaled up to the max allowed size
// in the describe situation.
if *verifyUnschedulablePods {
unschedulablePodsToHelp = FilterOutSchedulable(unschedulablePodsToHelp, nodes, allScheduled, predicateChecker)
}
if len(unschedulablePodsToHelp) == 0 { if len(unschedulablePodsToHelp) == 0 {
glog.V(1).Info("No unschedulable pods") glog.V(1).Info("No unschedulable pods")
} else { } else {
@ -134,16 +161,10 @@ func main() {
} }
if *scaleDownEnabled { if *scaleDownEnabled {
// In dry run only utilization is updated // In dry run only utilization is updated
calculateUtilizationOnly := lastScaleUpTime.Add(*scaleDownDelay).After(time.Now()) || calculateUtilizationOnly := lastScaleUpTime.Add(*scaleDownDelay).After(time.Now()) ||
lastScaleDownFailedTrial.Add(*scaleDownTrialFrequency).After(time.Now()) lastScaleDownFailedTrial.Add(*scaleDownTrialFrequency).After(time.Now())
allScheduled, err := scheduledPodLister.List()
if err != nil {
glog.Errorf("Failed to list scheduled pods: %v", err)
continue
}
underutilizedNodes = CalculateUnderutilizedNodes( underutilizedNodes = CalculateUnderutilizedNodes(
nodes, nodes,
underutilizedNodes, underutilizedNodes,

View File

@ -34,7 +34,17 @@ func NewPredicateChecker() *PredicateChecker {
return &PredicateChecker{} return &PredicateChecker{}
} }
// CheckPredicates Checks if the given pod can be placed on the given node. // FitsAny checks if the given pod can be place on any of the given nodes.
func (p *PredicateChecker) FitsAny(pod *kube_api.Pod, nodeInfos map[string]*schedulercache.NodeInfo) (string, error) {
for name, nodeInfo := range nodeInfos {
if err := p.CheckPredicates(pod, nodeInfo); err == nil {
return name, nil
}
}
return "", fmt.Errorf("cannot put pod %s on any node", pod.Name)
}
// CheckPredicates checks if the given pod can be placed on the given node.
func (p *PredicateChecker) CheckPredicates(pod *kube_api.Pod, nodeInfo *schedulercache.NodeInfo) error { func (p *PredicateChecker) CheckPredicates(pod *kube_api.Pod, nodeInfo *schedulercache.NodeInfo) error {
// TODO(fgrzadkowski): Use full list of predicates. // TODO(fgrzadkowski): Use full list of predicates.
match, err := predicates.GeneralPredicates(pod, nodeInfo) match, err := predicates.GeneralPredicates(pod, nodeInfo)

View File

@ -187,6 +187,34 @@ func resetPodScheduledConditionForPod(kubeClient *kube_client.Client, pod *kube_
return fmt.Errorf("Expected condition PodScheduled") return fmt.Errorf("Expected condition PodScheduled")
} }
// FilterOutSchedulable checks whether pods from <unschedulableCandidates> marked as unschedulable
// by Scheduler actually can't be scheduled on any node and filter out the ones that can.
func FilterOutSchedulable(unschedulableCandidates []*kube_api.Pod, nodes []*kube_api.Node, allPods []*kube_api.Pod, predicateChecker *simulator.PredicateChecker) []*kube_api.Pod {
unschedulablePods := []*kube_api.Pod{}
nodeNameToNodeInfo := createNodeNameToInfoMap(allPods, nodes)
for _, pod := range unschedulableCandidates {
if nodeName, err := predicateChecker.FitsAny(pod, nodeNameToNodeInfo); err == nil {
glog.Warningf("Pod %s marked as unschedulable can be scheduled on %s. Ignoring in scale up.", pod.Name, nodeName)
} else {
unschedulablePods = append(unschedulablePods, pod)
}
}
return unschedulablePods
}
// TODO: move this function to scheduler utils.
func createNodeNameToInfoMap(pods []*kube_api.Pod, nodes []*kube_api.Node) map[string]*schedulercache.NodeInfo {
nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods)
for _, node := range nodes {
if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found {
nodeInfo.SetNode(node)
}
}
return nodeNameToNodeInfo
}
// CheckMigsAndNodes checks if all migs have all required nodes. // CheckMigsAndNodes checks if all migs have all required nodes.
func CheckMigsAndNodes(nodes []*kube_api.Node, gceManager *gce.GceManager) error { func CheckMigsAndNodes(nodes []*kube_api.Node, gceManager *gce.GceManager) error {
migCount := make(map[string]int) migCount := make(map[string]int)