diff --git a/cluster-autoscaler/core/filter_out_schedulable.go b/cluster-autoscaler/core/filter_out_schedulable.go new file mode 100644 index 0000000000..7617babefc --- /dev/null +++ b/cluster-autoscaler/core/filter_out_schedulable.go @@ -0,0 +1,158 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "sort" + "time" + + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/processors/pods" + "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/utils/glogx" + schedulerutil "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/klog" + "k8s.io/kubernetes/pkg/scheduler/util" +) + +type filterOutSchedulablePodListProcessor struct{} + +// NewFilterOutSchedulablePodListProcessor creates a PodListProcessor filtering out schedulable pods +func NewFilterOutSchedulablePodListProcessor() pods.PodListProcessor { + return &filterOutSchedulablePodListProcessor{} +} + +// Process filters out pods which are schedulable from list of unschedulable pods. +func (filterOutSchedulablePodListProcessor) Process( + context *context.AutoscalingContext, + unschedulablePods []*apiv1.Pod, allScheduledPods []*apiv1.Pod, + allNodes []*apiv1.Node, readyNodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) { + // We need to check whether pods marked as unschedulable are actually unschedulable. + // It's likely we added a new node and the scheduler just haven't managed to put the + // pod on in yet. In this situation we don't want to trigger another scale-up. + // + // It's also important to prevent uncontrollable cluster growth if CA's simulated + // scheduler differs in opinion with real scheduler. Example of such situation: + // - CA and Scheduler has slightly different configuration + // - Scheduler can't schedule a pod and marks it as unschedulable + // - CA added a node which should help the pod + // - Scheduler doesn't schedule the pod on the new node + // because according to it logic it doesn't fit there + // - CA see the pod is still unschedulable, so it adds another node to help it + // + // With the check enabled the last point won't happen because CA will ignore a pod + // which is supposed to schedule on an existing node. + + // Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run. + // Such pods don't require scale up but should be considered during scale down. + unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := filterOutExpendableAndSplit(unschedulablePods, context.ExpendablePodsPriorityCutoff) + klog.V(4).Infof("Filtering out schedulables") + filterOutSchedulableStart := time.Now() + var unschedulablePodsToHelp []*apiv1.Pod + if context.FilterOutSchedulablePodsUsesPacking { + unschedulablePodsToHelp = filterOutSchedulableByPacking(unschedulablePods, readyNodes, allScheduledPods, + unschedulableWaitingForLowerPriorityPreemption, context.PredicateChecker, context.ExpendablePodsPriorityCutoff) + } else { + unschedulablePodsToHelp = filterOutSchedulableSimple(unschedulablePods, readyNodes, allScheduledPods, + unschedulableWaitingForLowerPriorityPreemption, context.PredicateChecker, context.ExpendablePodsPriorityCutoff) + } + + metrics.UpdateDurationFromStart(metrics.FilterOutSchedulable, filterOutSchedulableStart) + + if len(unschedulablePodsToHelp) != len(unschedulablePods) { + klog.V(2).Info("Schedulable pods present") + context.ProcessorCallbacks.DisableScaleDownForLoop() + } else { + klog.V(4).Info("No schedulable pods") + } + return unschedulablePodsToHelp, allScheduledPods, nil +} + +func (filterOutSchedulablePodListProcessor) CleanUp() { +} + +// filterOutSchedulableByPacking checks whether pods from marked as unschedulable +// can be scheduled on free capacity on existing nodes by trying to pack the pods. It tries to pack the higher priority +// pods first. It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption. +func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod, + predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod { + var unschedulablePods []*apiv1.Pod + nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff) + nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes) + loggingQuota := glogx.PodsLoggingQuota() + + sort.Slice(unschedulableCandidates, func(i, j int) bool { + return util.GetPodPriority(unschedulableCandidates[i]) > util.GetPodPriority(unschedulableCandidates[j]) + }) + + for _, pod := range unschedulableCandidates { + nodeName, err := predicateChecker.FitsAny(pod, nodeNameToNodeInfo) + if err != nil { + unschedulablePods = append(unschedulablePods, pod) + } else { + glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled on %s. Ignoring in scale up.", pod.Name, nodeName) + nodeNameToNodeInfo[nodeName] = schedulerutil.NodeWithPod(nodeNameToNodeInfo[nodeName], pod) + } + } + + glogx.V(4).Over(loggingQuota).Infof("%v other pods marked as unschedulable can be scheduled.", -loggingQuota.Left()) + return unschedulablePods +} + +// filterOutSchedulableSimple checks whether pods from marked as unschedulable +// by Scheduler actually can't be scheduled on any node and filter out the ones that can. +// It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption. +func filterOutSchedulableSimple(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod, + predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod { + var unschedulablePods []*apiv1.Pod + nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff) + nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes) + podSchedulable := make(podSchedulableMap) + loggingQuota := glogx.PodsLoggingQuota() + + for _, pod := range unschedulableCandidates { + cachedError, found := podSchedulable.get(pod) + // Try to get result from cache. + if found { + if cachedError != nil { + unschedulablePods = append(unschedulablePods, pod) + } else { + glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled (based on simulation run for other pod owned by the same controller). Ignoring in scale up.", pod.Name) + } + continue + } + + // Not found in cache, have to run the predicates. + nodeName, err := predicateChecker.FitsAny(pod, nodeNameToNodeInfo) + // err returned from FitsAny isn't a PredicateError. + // Hello, ugly hack. I wish you weren't here. + var predicateError *simulator.PredicateError + if err != nil { + predicateError = simulator.NewPredicateError("FitsAny", err, nil, nil) + unschedulablePods = append(unschedulablePods, pod) + } else { + glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled on %s. Ignoring in scale up.", pod.Name, nodeName) + } + podSchedulable.set(pod, predicateError) + } + + glogx.V(4).Over(loggingQuota).Infof("%v other pods marked as unschedulable can be scheduled.", -loggingQuota.Left()) + return unschedulablePods +} diff --git a/cluster-autoscaler/core/filter_out_schedulable_test.go b/cluster-autoscaler/core/filter_out_schedulable_test.go new file mode 100644 index 0000000000..468bff8f20 --- /dev/null +++ b/cluster-autoscaler/core/filter_out_schedulable_test.go @@ -0,0 +1,179 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "testing" + "time" + + "k8s.io/autoscaler/cluster-autoscaler/simulator" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/api/testapi" + + "github.com/stretchr/testify/assert" +) + +func TestFilterOutSchedulableByPacking(t *testing.T) { + rc1 := apiv1.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rc1", + Namespace: "default", + SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"), + UID: "12345678-1234-1234-1234-123456789012", + }, + } + + rc2 := apiv1.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rc2", + Namespace: "default", + SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"), + UID: "12345678-1234-1234-1234-12345678901a", + }, + } + + p1 := BuildTestPod("p1", 1500, 200000) + p2_1 := BuildTestPod("p2_2", 3000, 200000) + p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID) + p2_2 := BuildTestPod("p2_2", 3000, 200000) + p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID) + p3_1 := BuildTestPod("p3", 300, 200000) + p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID) + p3_2 := BuildTestPod("p3", 300, 200000) + p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID) + unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2} + + scheduledPod1 := BuildTestPod("s1", 100, 200000) + scheduledPod2 := BuildTestPod("s2", 1500, 200000) + scheduledPod3 := BuildTestPod("s3", 4000, 200000) + var priority1 int32 = 1 + scheduledPod3.Spec.Priority = &priority1 + scheduledPod1.Spec.NodeName = "node1" + scheduledPod2.Spec.NodeName = "node1" + scheduledPod2.Spec.NodeName = "node1" + + podWaitingForPreemption := BuildTestPod("w1", 1500, 200000) + var priority100 int32 = 100 + podWaitingForPreemption.Spec.Priority = &priority100 + podWaitingForPreemption.Status.NominatedNodeName = "node1" + + p4 := BuildTestPod("p4", 1800, 200000) + p4.Spec.Priority = &priority100 + + node := BuildTestNode("node1", 2000, 2000000) + SetNodeReadyState(node, true, time.Time{}) + + predicateChecker := simulator.NewTestPredicateChecker() + + res := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) + assert.Equal(t, 3, len(res)) + assert.Equal(t, p2_1, res[0]) + assert.Equal(t, p2_2, res[1]) + assert.Equal(t, p3_2, res[2]) + + res2 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) + assert.Equal(t, 4, len(res2)) + assert.Equal(t, p1, res2[0]) + assert.Equal(t, p2_1, res2[1]) + assert.Equal(t, p2_2, res2[2]) + assert.Equal(t, p3_2, res2[3]) + + res3 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10) + assert.Equal(t, 4, len(res3)) + assert.Equal(t, p1, res3[0]) + assert.Equal(t, p2_1, res3[1]) + assert.Equal(t, p2_2, res3[2]) + assert.Equal(t, p3_2, res3[3]) + + res4 := filterOutSchedulableByPacking(append(unschedulablePods, p4), []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) + assert.Equal(t, 5, len(res4)) + assert.Equal(t, p1, res4[0]) + assert.Equal(t, p2_1, res4[1]) + assert.Equal(t, p2_2, res4[2]) + assert.Equal(t, p3_1, res4[3]) + assert.Equal(t, p3_2, res4[4]) +} + +func TestFilterOutSchedulableSimple(t *testing.T) { + rc1 := apiv1.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rc1", + Namespace: "default", + SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"), + UID: "12345678-1234-1234-1234-123456789012", + }, + } + + rc2 := apiv1.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rc2", + Namespace: "default", + SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"), + UID: "12345678-1234-1234-1234-12345678901a", + }, + } + + p1 := BuildTestPod("p1", 1500, 200000) + p2_1 := BuildTestPod("p2_2", 3000, 200000) + p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID) + p2_2 := BuildTestPod("p2_2", 3000, 200000) + p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID) + p3_1 := BuildTestPod("p3", 100, 200000) + p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID) + p3_2 := BuildTestPod("p3", 100, 200000) + p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID) + unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2} + + scheduledPod1 := BuildTestPod("s1", 100, 200000) + scheduledPod2 := BuildTestPod("s2", 1500, 200000) + scheduledPod3 := BuildTestPod("s3", 4000, 200000) + var priority1 int32 = 1 + scheduledPod3.Spec.Priority = &priority1 + scheduledPod1.Spec.NodeName = "node1" + scheduledPod2.Spec.NodeName = "node1" + scheduledPod2.Spec.NodeName = "node1" + + podWaitingForPreemption := BuildTestPod("w1", 1500, 200000) + var priority100 int32 = 100 + podWaitingForPreemption.Spec.Priority = &priority100 + podWaitingForPreemption.Status.NominatedNodeName = "node1" + + node := BuildTestNode("node1", 2000, 2000000) + SetNodeReadyState(node, true, time.Time{}) + + predicateChecker := simulator.NewTestPredicateChecker() + + res := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) + assert.Equal(t, 2, len(res)) + assert.Equal(t, p2_1, res[0]) + assert.Equal(t, p2_2, res[1]) + + res2 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) + assert.Equal(t, 3, len(res2)) + assert.Equal(t, p1, res2[0]) + assert.Equal(t, p2_1, res2[1]) + assert.Equal(t, p2_2, res2[2]) + + res3 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10) + assert.Equal(t, 3, len(res3)) + assert.Equal(t, p1, res3[0]) + assert.Equal(t, p2_1, res3[1]) + assert.Equal(t, p2_2, res3[2]) +} diff --git a/cluster-autoscaler/core/scale_up_test.go b/cluster-autoscaler/core/scale_up_test.go index 9033d693b9..fc430ddbc9 100644 --- a/cluster-autoscaler/core/scale_up_test.go +++ b/cluster-autoscaler/core/scale_up_test.go @@ -28,7 +28,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/estimator" - ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/autoscaler/cluster-autoscaler/utils/units" @@ -423,7 +422,7 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) { extraPods[i] = pod } - processors := ca_processors.TestProcessors() + processors := NewTestProcessors() scaleUpStatus, err := ScaleUp(&context, processors, clusterState, extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) @@ -511,7 +510,7 @@ func TestScaleUpNodeComingNoScale(t *testing.T) { p3 := BuildTestPod("p-new", 550, 0) - processors := ca_processors.TestProcessors() + processors := NewTestProcessors() scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) assert.NoError(t, err) @@ -560,7 +559,7 @@ func TestScaleUpNodeComingHasScale(t *testing.T) { p3 := BuildTestPod("p-new", 550, 0) p4 := BuildTestPod("p-new", 550, 0) - processors := ca_processors.TestProcessors() + processors := NewTestProcessors() scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3, p4}, nodes, []*appsv1.DaemonSet{}, nodeInfos) assert.NoError(t, err) @@ -605,7 +604,7 @@ func TestScaleUpUnhealthy(t *testing.T) { clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 550, 0) - processors := ca_processors.TestProcessors() + processors := NewTestProcessors() scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) assert.NoError(t, err) @@ -644,7 +643,7 @@ func TestScaleUpNoHelp(t *testing.T) { clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 500, 0) - processors := ca_processors.TestProcessors() + processors := NewTestProcessors() scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) @@ -712,7 +711,7 @@ func TestScaleUpBalanceGroups(t *testing.T) { pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0)) } - processors := ca_processors.TestProcessors() + processors := NewTestProcessors() scaleUpStatus, typedErr := ScaleUp(&context, processors, clusterState, pods, nodes, []*appsv1.DaemonSet{}, nodeInfos) assert.NoError(t, typedErr) @@ -763,7 +762,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) - processors := ca_processors.TestProcessors() + processors := NewTestProcessors() processors.NodeGroupListProcessor = &mockAutoprovisioningNodeGroupListProcessor{t} processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t} diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 2521d240f3..1638b1b856 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -282,56 +282,13 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError return errors.ToAutoscalerError(errors.ApiCallError, err) } - allUnschedulablePods, allScheduledPods, err = a.processors.PodListProcessor.Process(a.AutoscalingContext, allUnschedulablePods, allScheduledPods, allNodes, readyNodes) - if err != nil { - klog.Errorf("Failed to process pod list: %v", err) - return errors.ToAutoscalerError(errors.InternalError, err) - } - ConfigurePredicateCheckerForLoop(allUnschedulablePods, allScheduledPods, a.PredicateChecker) - // We need to check whether pods marked as unschedulable are actually unschedulable. - // It's likely we added a new node and the scheduler just haven't managed to put the - // pod on in yet. In this situation we don't want to trigger another scale-up. - // - // It's also important to prevent uncontrollable cluster growth if CA's simulated - // scheduler differs in opinion with real scheduler. Example of such situation: - // - CA and Scheduler has slightly different configuration - // - Scheduler can't schedule a pod and marks it as unschedulable - // - CA added a node which should help the pod - // - Scheduler doesn't schedule the pod on the new node - // because according to it logic it doesn't fit there - // - CA see the pod is still unschedulable, so it adds another node to help it - // - // With the check enabled the last point won't happen because CA will ignore a pod - // which is supposed to schedule on an existing node. - scaleDownForbidden := false - unschedulablePodsWithoutTPUs := tpu.ClearTPURequests(allUnschedulablePods) - // Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run. - // Such pods don't require scale up but should be considered during scale down. - unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := filterOutExpendableAndSplit(unschedulablePodsWithoutTPUs, a.ExpendablePodsPriorityCutoff) - - klog.V(4).Infof("Filtering out schedulables") - filterOutSchedulableStart := time.Now() - var unschedulablePodsToHelp []*apiv1.Pod - if a.FilterOutSchedulablePodsUsesPacking { - unschedulablePodsToHelp = filterOutSchedulableByPacking(unschedulablePods, readyNodes, allScheduledPods, - unschedulableWaitingForLowerPriorityPreemption, a.PredicateChecker, a.ExpendablePodsPriorityCutoff) - } else { - unschedulablePodsToHelp = filterOutSchedulableSimple(unschedulablePods, readyNodes, allScheduledPods, - unschedulableWaitingForLowerPriorityPreemption, a.PredicateChecker, a.ExpendablePodsPriorityCutoff) - } - - metrics.UpdateDurationFromStart(metrics.FilterOutSchedulable, filterOutSchedulableStart) - - if len(unschedulablePodsToHelp) != len(unschedulablePods) { - klog.V(2).Info("Schedulable pods present") - scaleDownForbidden = true - } else { - klog.V(4).Info("No schedulable pods") - } + // todo: this is also computed in filterOutSchedulablePodListProcessor; avoid that. + _, unschedulableWaitingForLowerPriorityPreemption := filterOutExpendableAndSplit(unschedulablePodsWithoutTPUs, a.ExpendablePodsPriorityCutoff) + unschedulablePodsToHelp, allScheduledPods, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePodsWithoutTPUs, allScheduledPods, allNodes, readyNodes) // finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable) unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime) @@ -347,7 +304,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError // is more pods to come. In theory we could check the newest pod time but then if pod were created // slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time. // We also want to skip a real scale down (just like if the pods were handled). - scaleDownForbidden = true + a.processorCallbacks.DisableScaleDownForLoop() scaleUpStatus.Result = status.ScaleUpInCooldown klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more") } else { @@ -405,8 +362,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError } } - scaleDownForbidden = scaleDownForbidden || a.processorCallbacks.disableScaleDownForLoop - scaleDownInCooldown := scaleDownForbidden || + scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop || a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) || a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) || a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime) @@ -416,7 +372,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+ "lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v isDeleteInProgress=%v", calculateUnneededOnly, a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime, - scaleDownForbidden, scaleDown.nodeDeleteStatus.IsDeleteInProgress()) + a.processorCallbacks.disableScaleDownForLoop, scaleDown.nodeDeleteStatus.IsDeleteInProgress()) if scaleDownInCooldown { scaleDownStatus.Result = status.ScaleDownInCooldown diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index c4c4ed6c2a..0385888da8 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -29,7 +29,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/estimator" - ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" @@ -208,7 +207,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), scaleDown: sd, - processors: ca_processors.TestProcessors(), + processors: NewTestProcessors(), processorCallbacks: processorCallbacks, initialized: true, } @@ -355,7 +354,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { provider.AddNode("ng1,", n1) assert.NotNil(t, provider) - processors := ca_processors.TestProcessors() + processors := NewTestProcessors() processors.NodeGroupManager = nodeGroupManager processors.NodeGroupListProcessor = nodeGroupListProcessor @@ -533,7 +532,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), scaleDown: sd, - processors: ca_processors.TestProcessors(), + processors: NewTestProcessors(), processorCallbacks: processorCallbacks, } @@ -659,7 +658,7 @@ func TestStaticAutoscalerRunOncePodsWithFilterOutSchedulablePodsUsesPackingFalse lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), scaleDown: sd, - processors: ca_processors.TestProcessors(), + processors: NewTestProcessors(), processorCallbacks: processorCallbacks, } @@ -776,7 +775,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { lastScaleUpTime: time.Now(), lastScaleDownFailTime: time.Now(), scaleDown: sd, - processors: ca_processors.TestProcessors(), + processors: NewTestProcessors(), processorCallbacks: processorCallbacks, } diff --git a/cluster-autoscaler/core/test_utils.go b/cluster-autoscaler/core/test_utils.go new file mode 100644 index 0000000000..1c79817dc4 --- /dev/null +++ b/cluster-autoscaler/core/test_utils.go @@ -0,0 +1,38 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" +) + +// NewTestProcessors returns a set of simple processors for use in tests. +func NewTestProcessors() *processors.AutoscalingProcessors { + return &processors.AutoscalingProcessors{ + PodListProcessor: NewFilterOutSchedulablePodListProcessor(), + NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}, + NodeGroupSetProcessor: &nodegroupset.BalancingNodeGroupSetProcessor{}, + // TODO(bskiba): change scale up test so that this can be a NoOpProcessor + ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{}, + ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{}, + AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{}, + NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), + } +} diff --git a/cluster-autoscaler/core/utils.go b/cluster-autoscaler/core/utils.go index ae51bd8dcc..81b366dfd5 100644 --- a/cluster-autoscaler/core/utils.go +++ b/cluster-autoscaler/core/utils.go @@ -20,11 +20,11 @@ import ( "fmt" "math/rand" "reflect" - "sort" "time" - "k8s.io/kubernetes/pkg/scheduler/util" - + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" @@ -38,11 +38,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/glogx" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" - - appsv1 "k8s.io/api/apps/v1" - apiv1 "k8s.io/api/core/v1" - apiequality "k8s.io/apimachinery/pkg/api/equality" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" "k8s.io/klog" @@ -108,75 +103,6 @@ func (podMap podSchedulableMap) set(pod *apiv1.Pod, err *simulator.PredicateErro }) } -// filterOutSchedulableByPacking checks whether pods from marked as unschedulable -// can be scheduled on free capacity on existing nodes by trying to pack the pods. It tries to pack the higher priority -// pods first. It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption. -func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod, - predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod { - var unschedulablePods []*apiv1.Pod - nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff) - nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes) - loggingQuota := glogx.PodsLoggingQuota() - - sort.Slice(unschedulableCandidates, func(i, j int) bool { - return util.GetPodPriority(unschedulableCandidates[i]) > util.GetPodPriority(unschedulableCandidates[j]) - }) - - for _, pod := range unschedulableCandidates { - nodeName, err := predicateChecker.FitsAny(pod, nodeNameToNodeInfo) - if err != nil { - unschedulablePods = append(unschedulablePods, pod) - } else { - glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled on %s. Ignoring in scale up.", pod.Name, nodeName) - nodeNameToNodeInfo[nodeName] = scheduler_util.NodeWithPod(nodeNameToNodeInfo[nodeName], pod) - } - } - - glogx.V(4).Over(loggingQuota).Infof("%v other pods marked as unschedulable can be scheduled.", -loggingQuota.Left()) - return unschedulablePods -} - -// filterOutSchedulableSimple checks whether pods from marked as unschedulable -// by Scheduler actually can't be scheduled on any node and filter out the ones that can. -// It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption. -func filterOutSchedulableSimple(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod, - predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod { - var unschedulablePods []*apiv1.Pod - nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff) - nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes) - podSchedulable := make(podSchedulableMap) - loggingQuota := glogx.PodsLoggingQuota() - - for _, pod := range unschedulableCandidates { - cachedError, found := podSchedulable.get(pod) - // Try to get result from cache. - if found { - if cachedError != nil { - unschedulablePods = append(unschedulablePods, pod) - } else { - glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled (based on simulation run for other pod owned by the same controller). Ignoring in scale up.", pod.Name) - } - continue - } - - // Not found in cache, have to run the predicates. - nodeName, err := predicateChecker.FitsAny(pod, nodeNameToNodeInfo) - // err returned from FitsAny isn't a PredicateError. - // Hello, ugly hack. I wish you weren't here. - var predicateError *simulator.PredicateError - if err != nil { - predicateError = simulator.NewPredicateError("FitsAny", err, nil, nil) - unschedulablePods = append(unschedulablePods, pod) - } else { - glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled on %s. Ignoring in scale up.", pod.Name, nodeName) - } - podSchedulable.set(pod, predicateError) - } - - glogx.V(4).Over(loggingQuota).Infof("%v other pods marked as unschedulable can be scheduled.", -loggingQuota.Left()) - return unschedulablePods -} - // filterOutExpendableAndSplit filters out expendable pods and splits into: // - waiting for lower priority pods preemption // - other pods. diff --git a/cluster-autoscaler/core/utils_test.go b/cluster-autoscaler/core/utils_test.go index 24b77de1d6..181118c021 100644 --- a/cluster-autoscaler/core/utils_test.go +++ b/cluster-autoscaler/core/utils_test.go @@ -121,154 +121,6 @@ func TestPodSchedulableMap(t *testing.T) { assert.Nil(t, err) } -func TestFilterOutSchedulableByPacking(t *testing.T) { - rc1 := apiv1.ReplicationController{ - ObjectMeta: metav1.ObjectMeta{ - Name: "rc1", - Namespace: "default", - SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"), - UID: "12345678-1234-1234-1234-123456789012", - }, - } - - rc2 := apiv1.ReplicationController{ - ObjectMeta: metav1.ObjectMeta{ - Name: "rc2", - Namespace: "default", - SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"), - UID: "12345678-1234-1234-1234-12345678901a", - }, - } - - p1 := BuildTestPod("p1", 1500, 200000) - p2_1 := BuildTestPod("p2_2", 3000, 200000) - p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID) - p2_2 := BuildTestPod("p2_2", 3000, 200000) - p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID) - p3_1 := BuildTestPod("p3", 300, 200000) - p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID) - p3_2 := BuildTestPod("p3", 300, 200000) - p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID) - unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2} - - scheduledPod1 := BuildTestPod("s1", 100, 200000) - scheduledPod2 := BuildTestPod("s2", 1500, 200000) - scheduledPod3 := BuildTestPod("s3", 4000, 200000) - var priority1 int32 = 1 - scheduledPod3.Spec.Priority = &priority1 - scheduledPod1.Spec.NodeName = "node1" - scheduledPod2.Spec.NodeName = "node1" - scheduledPod2.Spec.NodeName = "node1" - - podWaitingForPreemption := BuildTestPod("w1", 1500, 200000) - var priority100 int32 = 100 - podWaitingForPreemption.Spec.Priority = &priority100 - podWaitingForPreemption.Status.NominatedNodeName = "node1" - - p4 := BuildTestPod("p4", 1800, 200000) - p4.Spec.Priority = &priority100 - - node := BuildTestNode("node1", 2000, 2000000) - SetNodeReadyState(node, true, time.Time{}) - - predicateChecker := simulator.NewTestPredicateChecker() - - res := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) - assert.Equal(t, 3, len(res)) - assert.Equal(t, p2_1, res[0]) - assert.Equal(t, p2_2, res[1]) - assert.Equal(t, p3_2, res[2]) - - res2 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) - assert.Equal(t, 4, len(res2)) - assert.Equal(t, p1, res2[0]) - assert.Equal(t, p2_1, res2[1]) - assert.Equal(t, p2_2, res2[2]) - assert.Equal(t, p3_2, res2[3]) - - res3 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10) - assert.Equal(t, 4, len(res3)) - assert.Equal(t, p1, res3[0]) - assert.Equal(t, p2_1, res3[1]) - assert.Equal(t, p2_2, res3[2]) - assert.Equal(t, p3_2, res3[3]) - - res4 := filterOutSchedulableByPacking(append(unschedulablePods, p4), []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) - assert.Equal(t, 5, len(res4)) - assert.Equal(t, p1, res4[0]) - assert.Equal(t, p2_1, res4[1]) - assert.Equal(t, p2_2, res4[2]) - assert.Equal(t, p3_1, res4[3]) - assert.Equal(t, p3_2, res4[4]) -} - -func TestFilterOutSchedulableSimple(t *testing.T) { - rc1 := apiv1.ReplicationController{ - ObjectMeta: metav1.ObjectMeta{ - Name: "rc1", - Namespace: "default", - SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"), - UID: "12345678-1234-1234-1234-123456789012", - }, - } - - rc2 := apiv1.ReplicationController{ - ObjectMeta: metav1.ObjectMeta{ - Name: "rc2", - Namespace: "default", - SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"), - UID: "12345678-1234-1234-1234-12345678901a", - }, - } - - p1 := BuildTestPod("p1", 1500, 200000) - p2_1 := BuildTestPod("p2_2", 3000, 200000) - p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID) - p2_2 := BuildTestPod("p2_2", 3000, 200000) - p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID) - p3_1 := BuildTestPod("p3", 100, 200000) - p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID) - p3_2 := BuildTestPod("p3", 100, 200000) - p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID) - unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2} - - scheduledPod1 := BuildTestPod("s1", 100, 200000) - scheduledPod2 := BuildTestPod("s2", 1500, 200000) - scheduledPod3 := BuildTestPod("s3", 4000, 200000) - var priority1 int32 = 1 - scheduledPod3.Spec.Priority = &priority1 - scheduledPod1.Spec.NodeName = "node1" - scheduledPod2.Spec.NodeName = "node1" - scheduledPod2.Spec.NodeName = "node1" - - podWaitingForPreemption := BuildTestPod("w1", 1500, 200000) - var priority100 int32 = 100 - podWaitingForPreemption.Spec.Priority = &priority100 - podWaitingForPreemption.Status.NominatedNodeName = "node1" - - node := BuildTestNode("node1", 2000, 2000000) - SetNodeReadyState(node, true, time.Time{}) - - predicateChecker := simulator.NewTestPredicateChecker() - - res := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) - assert.Equal(t, 2, len(res)) - assert.Equal(t, p2_1, res[0]) - assert.Equal(t, p2_2, res[1]) - - res2 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10) - assert.Equal(t, 3, len(res2)) - assert.Equal(t, p1, res2[0]) - assert.Equal(t, p2_1, res2[1]) - assert.Equal(t, p2_2, res2[2]) - - res3 := filterOutSchedulableSimple(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10) - assert.Equal(t, 3, len(res3)) - assert.Equal(t, p1, res3[0]) - assert.Equal(t, p2_1, res3[1]) - assert.Equal(t, p2_2, res3[2]) -} - func TestFilterOutExpendableAndSplit(t *testing.T) { var priority1 int32 = 1 var priority100 int32 = 100 diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 2c32eb17bd..8fe577e0cd 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -278,7 +278,10 @@ func buildAutoscaler() (core.Autoscaler, error) { autoscalingOptions := createAutoscalingOptions() kubeClient := createKubeClient(getKubeConfig()) eventsKubeClient := createKubeClient(getKubeConfig()) + processors := ca_processors.DefaultProcessors() + processors.PodListProcessor = core.NewFilterOutSchedulablePodListProcessor() + opts := core.AutoscalerOptions{ AutoscalingOptions: autoscalingOptions, KubeClient: kubeClient, diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index ccde1b1b8c..0a255f031f 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -55,20 +55,6 @@ func DefaultProcessors() *AutoscalingProcessors { } } -// TestProcessors returns a set of simple processors for use in tests. -func TestProcessors() *AutoscalingProcessors { - return &AutoscalingProcessors{ - PodListProcessor: &pods.NoOpPodListProcessor{}, - NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}, - NodeGroupSetProcessor: &nodegroupset.BalancingNodeGroupSetProcessor{}, - // TODO(bskiba): change scale up test so that this can be a NoOpProcessor - ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{}, - ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{}, - AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{}, - NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), - } -} - // CleanUp cleans up the processors' internal structures. func (ap *AutoscalingProcessors) CleanUp() { ap.PodListProcessor.CleanUp()