Include pods with NominatedNodeName in scheduledPods list used for scale-up considerations

Change-Id: Ie4c095b30bf0cd1f160f1ac4b8c1fcb8c0524096
This commit is contained in:
Łukasz Osipiuk 2019-04-15 11:31:18 +02:00
parent db4c6f1133
commit c9811e87b4
3 changed files with 32 additions and 36 deletions

View File

@ -60,18 +60,15 @@ func (filterOutSchedulablePodListProcessor) Process(
// With the check enabled the last point won't happen because CA will ignore a pod
// which is supposed to schedule on an existing node.
// Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run.
// Such pods don't require scale up but should be considered during scale down.
unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := filterOutExpendableAndSplit(unschedulablePods, context.ExpendablePodsPriorityCutoff)
klog.V(4).Infof("Filtering out schedulables")
filterOutSchedulableStart := time.Now()
var unschedulablePodsToHelp []*apiv1.Pod
if context.FilterOutSchedulablePodsUsesPacking {
unschedulablePodsToHelp = filterOutSchedulableByPacking(unschedulablePods, readyNodes, allScheduledPods,
unschedulableWaitingForLowerPriorityPreemption, context.PredicateChecker, context.ExpendablePodsPriorityCutoff)
context.PredicateChecker, context.ExpendablePodsPriorityCutoff)
} else {
unschedulablePodsToHelp = filterOutSchedulableSimple(unschedulablePods, readyNodes, allScheduledPods,
unschedulableWaitingForLowerPriorityPreemption, context.PredicateChecker, context.ExpendablePodsPriorityCutoff)
context.PredicateChecker, context.ExpendablePodsPriorityCutoff)
}
metrics.UpdateDurationFromStart(metrics.FilterOutSchedulable, filterOutSchedulableStart)
@ -91,11 +88,11 @@ func (filterOutSchedulablePodListProcessor) CleanUp() {
// filterOutSchedulableByPacking checks whether pods from <unschedulableCandidates> marked as unschedulable
// can be scheduled on free capacity on existing nodes by trying to pack the pods. It tries to pack the higher priority
// pods first. It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption.
func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod,
func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod,
predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod {
var unschedulablePods []*apiv1.Pod
nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff)
nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes)
nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(nonExpendableScheduled, nodes)
loggingQuota := glogx.PodsLoggingQuota()
sort.Slice(unschedulableCandidates, func(i, j int) bool {
@ -119,11 +116,11 @@ func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes [
// filterOutSchedulableSimple checks whether pods from <unschedulableCandidates> marked as unschedulable
// by Scheduler actually can't be scheduled on any node and filter out the ones that can.
// It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption.
func filterOutSchedulableSimple(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod,
func filterOutSchedulableSimple(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod,
predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod {
var unschedulablePods []*apiv1.Pod
nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff)
nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes)
nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(nonExpendableScheduled, nodes)
podSchedulable := make(podSchedulableMap)
loggingQuota := glogx.PodsLoggingQuota()

View File

@ -82,27 +82,20 @@ func TestFilterOutSchedulableByPacking(t *testing.T) {
predicateChecker := simulator.NewTestPredicateChecker()
res := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
res := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, predicateChecker, 10)
assert.Equal(t, 3, len(res))
assert.Equal(t, p2_1, res[0])
assert.Equal(t, p2_2, res[1])
assert.Equal(t, p3_2, res[2])
res2 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
res2 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, predicateChecker, 10)
assert.Equal(t, 4, len(res2))
assert.Equal(t, p1, res2[0])
assert.Equal(t, p2_1, res2[1])
assert.Equal(t, p2_2, res2[2])
assert.Equal(t, p3_2, res2[3])
res3 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10)
assert.Equal(t, 4, len(res3))
assert.Equal(t, p1, res3[0])
assert.Equal(t, p2_1, res3[1])
assert.Equal(t, p2_2, res3[2])
assert.Equal(t, p3_2, res3[3])
res4 := filterOutSchedulableByPacking(append(unschedulablePods, p4), []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
res4 := filterOutSchedulableByPacking(append(unschedulablePods, p4), []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, predicateChecker, 10)
assert.Equal(t, 5, len(res4))
assert.Equal(t, p1, res4[0])
assert.Equal(t, p2_1, res4[1])
@ -160,20 +153,15 @@ func TestFilterOutSchedulableSimple(t *testing.T) {
predicateChecker := simulator.NewTestPredicateChecker()
res := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
res := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, predicateChecker, 10)
assert.Equal(t, 2, len(res))
assert.Equal(t, p2_1, res[0])
assert.Equal(t, p2_2, res[1])
res2 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
res2 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, predicateChecker, 10)
assert.Equal(t, 3, len(res2))
assert.Equal(t, p1, res2[0])
assert.Equal(t, p2_1, res2[1])
assert.Equal(t, p2_2, res2[2])
res3 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10)
assert.Equal(t, 3, len(res3))
assert.Equal(t, p1, res3[0])
assert.Equal(t, p2_1, res3[1])
assert.Equal(t, p2_2, res3[2])
}

View File

@ -269,26 +269,35 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
metrics.UpdateLastTime(metrics.Autoscaling, time.Now())
allUnschedulablePods, err := unschedulablePodLister.List()
unschedulablePods, err := unschedulablePodLister.List()
if err != nil {
klog.Errorf("Failed to list unscheduled pods: %v", err)
return errors.ToAutoscalerError(errors.ApiCallError, err)
}
metrics.UpdateUnschedulablePodsCount(len(allUnschedulablePods))
metrics.UpdateUnschedulablePodsCount(len(unschedulablePods))
allScheduledPods, err := scheduledPodLister.List()
originalScheduledPods, err := scheduledPodLister.List()
if err != nil {
klog.Errorf("Failed to list scheduled pods: %v", err)
return errors.ToAutoscalerError(errors.ApiCallError, err)
}
ConfigurePredicateCheckerForLoop(allUnschedulablePods, allScheduledPods, a.PredicateChecker)
// scheduledPods will be mutated over this method. We keep original list of pods on originalScheduledPods.
scheduledPods := append([]*apiv1.Pod{}, originalScheduledPods...)
unschedulablePodsWithoutTPUs := tpu.ClearTPURequests(allUnschedulablePods)
ConfigurePredicateCheckerForLoop(unschedulablePods, scheduledPods, a.PredicateChecker)
// todo: this is also computed in filterOutSchedulablePodListProcessor; avoid that.
_, unschedulableWaitingForLowerPriorityPreemption := filterOutExpendableAndSplit(unschedulablePodsWithoutTPUs, a.ExpendablePodsPriorityCutoff)
unschedulablePodsToHelp, allScheduledPods, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePodsWithoutTPUs, allScheduledPods, allNodes, readyNodes)
unschedulablePods = tpu.ClearTPURequests(unschedulablePods)
// todo: move split and append below to separate PodListProcessor
// Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run.
// Such pods don't require scale up but should be considered during scale down.
unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := filterOutExpendableAndSplit(unschedulablePods, a.ExpendablePodsPriorityCutoff)
// we tread pods with nominated node-name as scheduled for sake of scale-up considerations
scheduledPods = append(scheduledPods, unschedulableWaitingForLowerPriorityPreemption...)
unschedulablePodsToHelp, scheduledPods, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods, scheduledPods, allNodes, readyNodes)
// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)
@ -347,7 +356,9 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleDown.CleanUp(currentTime)
potentiallyUnneeded := getPotentiallyUnneededNodes(autoscalingContext, allNodes)
typedErr := scaleDown.UpdateUnneededNodes(allNodes, potentiallyUnneeded, append(allScheduledPods, unschedulableWaitingForLowerPriorityPreemption...), currentTime, pdbs)
// We use scheduledPods (not originalScheduledPods) here, so artificial scheduled pods introduced by processors
// (e.g unscheduled pods with nominated node name) can block scaledown of given node.
typedErr := scaleDown.UpdateUnneededNodes(allNodes, potentiallyUnneeded, scheduledPods, currentTime, pdbs)
if typedErr != nil {
scaleDownStatus.Result = status.ScaleDownError
klog.Errorf("Failed to scale down: %v", typedErr)
@ -387,7 +398,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleDownStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
scaleDownStatus, typedErr := scaleDown.TryToScaleDown(allNodes, allScheduledPods, pdbs, currentTime)
scaleDownStatus, typedErr := scaleDown.TryToScaleDown(allNodes, originalScheduledPods, pdbs, currentTime)
metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
if scaleDownStatus.Result == status.ScaleDownNodeDeleted {