diff --git a/cluster-autoscaler/core/filter_out_schedulable.go b/cluster-autoscaler/core/filter_out_schedulable.go index 7617babefc..aa6dbc4e5d 100644 --- a/cluster-autoscaler/core/filter_out_schedulable.go +++ b/cluster-autoscaler/core/filter_out_schedulable.go @@ -60,18 +60,15 @@ func (filterOutSchedulablePodListProcessor) Process( // With the check enabled the last point won't happen because CA will ignore a pod // which is supposed to schedule on an existing node. - // Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run. - // Such pods don't require scale up but should be considered during scale down. - unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := filterOutExpendableAndSplit(unschedulablePods, context.ExpendablePodsPriorityCutoff) klog.V(4).Infof("Filtering out schedulables") filterOutSchedulableStart := time.Now() var unschedulablePodsToHelp []*apiv1.Pod if context.FilterOutSchedulablePodsUsesPacking { unschedulablePodsToHelp = filterOutSchedulableByPacking(unschedulablePods, readyNodes, allScheduledPods, - unschedulableWaitingForLowerPriorityPreemption, context.PredicateChecker, context.ExpendablePodsPriorityCutoff) + context.PredicateChecker, context.ExpendablePodsPriorityCutoff) } else { unschedulablePodsToHelp = filterOutSchedulableSimple(unschedulablePods, readyNodes, allScheduledPods, - unschedulableWaitingForLowerPriorityPreemption, context.PredicateChecker, context.ExpendablePodsPriorityCutoff) + context.PredicateChecker, context.ExpendablePodsPriorityCutoff) } metrics.UpdateDurationFromStart(metrics.FilterOutSchedulable, filterOutSchedulableStart) @@ -91,11 +88,11 @@ func (filterOutSchedulablePodListProcessor) CleanUp() { // filterOutSchedulableByPacking checks whether pods from marked as unschedulable // can be scheduled on free capacity on existing nodes by trying to pack the pods. It tries to pack the higher priority // pods first. It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption. -func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod, +func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod { var unschedulablePods []*apiv1.Pod nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff) - nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes) + nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(nonExpendableScheduled, nodes) loggingQuota := glogx.PodsLoggingQuota() sort.Slice(unschedulableCandidates, func(i, j int) bool { @@ -119,11 +116,11 @@ func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes [ // filterOutSchedulableSimple checks whether pods from marked as unschedulable // by Scheduler actually can't be scheduled on any node and filter out the ones that can. // It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption. -func filterOutSchedulableSimple(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod, +func filterOutSchedulableSimple(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod { var unschedulablePods []*apiv1.Pod nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff) - nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes) + nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(nonExpendableScheduled, nodes) podSchedulable := make(podSchedulableMap) loggingQuota := glogx.PodsLoggingQuota() diff --git a/cluster-autoscaler/core/filter_out_schedulable_test.go b/cluster-autoscaler/core/filter_out_schedulable_test.go index 468bff8f20..b1ef1c6624 100644 --- a/cluster-autoscaler/core/filter_out_schedulable_test.go +++ b/cluster-autoscaler/core/filter_out_schedulable_test.go @@ -82,27 +82,20 @@ func TestFilterOutSchedulableByPacking(t *testing.T) { predicateChecker := simulator.NewTestPredicateChecker() - res := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) + res := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, predicateChecker, 10) assert.Equal(t, 3, len(res)) assert.Equal(t, p2_1, res[0]) assert.Equal(t, p2_2, res[1]) assert.Equal(t, p3_2, res[2]) - res2 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) + res2 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, predicateChecker, 10) assert.Equal(t, 4, len(res2)) assert.Equal(t, p1, res2[0]) assert.Equal(t, p2_1, res2[1]) assert.Equal(t, p2_2, res2[2]) assert.Equal(t, p3_2, res2[3]) - res3 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10) - assert.Equal(t, 4, len(res3)) - assert.Equal(t, p1, res3[0]) - assert.Equal(t, p2_1, res3[1]) - assert.Equal(t, p2_2, res3[2]) - assert.Equal(t, p3_2, res3[3]) - - res4 := filterOutSchedulableByPacking(append(unschedulablePods, p4), []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) + res4 := filterOutSchedulableByPacking(append(unschedulablePods, p4), []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, predicateChecker, 10) assert.Equal(t, 5, len(res4)) assert.Equal(t, p1, res4[0]) assert.Equal(t, p2_1, res4[1]) @@ -160,20 +153,15 @@ func TestFilterOutSchedulableSimple(t *testing.T) { predicateChecker := simulator.NewTestPredicateChecker() - res := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) + res := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, predicateChecker, 10) assert.Equal(t, 2, len(res)) assert.Equal(t, p2_1, res[0]) assert.Equal(t, p2_2, res[1]) - res2 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) + res2 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, predicateChecker, 10) assert.Equal(t, 3, len(res2)) assert.Equal(t, p1, res2[0]) assert.Equal(t, p2_1, res2[1]) assert.Equal(t, p2_2, res2[2]) - res3 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10) - assert.Equal(t, 3, len(res3)) - assert.Equal(t, p1, res3[0]) - assert.Equal(t, p2_1, res3[1]) - assert.Equal(t, p2_2, res3[2]) } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 1638b1b856..49af04ad4d 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -269,26 +269,35 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError metrics.UpdateLastTime(metrics.Autoscaling, time.Now()) - allUnschedulablePods, err := unschedulablePodLister.List() + unschedulablePods, err := unschedulablePodLister.List() if err != nil { klog.Errorf("Failed to list unscheduled pods: %v", err) return errors.ToAutoscalerError(errors.ApiCallError, err) } - metrics.UpdateUnschedulablePodsCount(len(allUnschedulablePods)) + metrics.UpdateUnschedulablePodsCount(len(unschedulablePods)) - allScheduledPods, err := scheduledPodLister.List() + originalScheduledPods, err := scheduledPodLister.List() if err != nil { klog.Errorf("Failed to list scheduled pods: %v", err) return errors.ToAutoscalerError(errors.ApiCallError, err) } - ConfigurePredicateCheckerForLoop(allUnschedulablePods, allScheduledPods, a.PredicateChecker) + // scheduledPods will be mutated over this method. We keep original list of pods on originalScheduledPods. + scheduledPods := append([]*apiv1.Pod{}, originalScheduledPods...) - unschedulablePodsWithoutTPUs := tpu.ClearTPURequests(allUnschedulablePods) + ConfigurePredicateCheckerForLoop(unschedulablePods, scheduledPods, a.PredicateChecker) - // todo: this is also computed in filterOutSchedulablePodListProcessor; avoid that. - _, unschedulableWaitingForLowerPriorityPreemption := filterOutExpendableAndSplit(unschedulablePodsWithoutTPUs, a.ExpendablePodsPriorityCutoff) - unschedulablePodsToHelp, allScheduledPods, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePodsWithoutTPUs, allScheduledPods, allNodes, readyNodes) + unschedulablePods = tpu.ClearTPURequests(unschedulablePods) + + // todo: move split and append below to separate PodListProcessor + // Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run. + // Such pods don't require scale up but should be considered during scale down. + unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := filterOutExpendableAndSplit(unschedulablePods, a.ExpendablePodsPriorityCutoff) + + // we tread pods with nominated node-name as scheduled for sake of scale-up considerations + scheduledPods = append(scheduledPods, unschedulableWaitingForLowerPriorityPreemption...) + + unschedulablePodsToHelp, scheduledPods, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods, scheduledPods, allNodes, readyNodes) // finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable) unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime) @@ -347,7 +356,9 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError scaleDown.CleanUp(currentTime) potentiallyUnneeded := getPotentiallyUnneededNodes(autoscalingContext, allNodes) - typedErr := scaleDown.UpdateUnneededNodes(allNodes, potentiallyUnneeded, append(allScheduledPods, unschedulableWaitingForLowerPriorityPreemption...), currentTime, pdbs) + // We use scheduledPods (not originalScheduledPods) here, so artificial scheduled pods introduced by processors + // (e.g unscheduled pods with nominated node name) can block scaledown of given node. + typedErr := scaleDown.UpdateUnneededNodes(allNodes, potentiallyUnneeded, scheduledPods, currentTime, pdbs) if typedErr != nil { scaleDownStatus.Result = status.ScaleDownError klog.Errorf("Failed to scale down: %v", typedErr) @@ -387,7 +398,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError scaleDownStart := time.Now() metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart) - scaleDownStatus, typedErr := scaleDown.TryToScaleDown(allNodes, allScheduledPods, pdbs, currentTime) + scaleDownStatus, typedErr := scaleDown.TryToScaleDown(allNodes, originalScheduledPods, pdbs, currentTime) metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart) if scaleDownStatus.Result == status.ScaleDownNodeDeleted {