From 90aa28a0773f18a91fb0c1d74f80fdc7feb06a3a Mon Sep 17 00:00:00 2001 From: Vivek Bagade Date: Tue, 18 Jun 2019 10:57:46 +0200 Subject: [PATCH] Move pod packing in upcoming nodes to RunOnce from Estimator for performance improvements --- .../core/filter_out_schedulable.go | 45 ++- .../core/filter_out_schedulable_test.go | 6 +- cluster-autoscaler/core/scale_up_test.go | 100 ------- cluster-autoscaler/core/static_autoscaler.go | 19 +- .../core/static_autoscaler_test.go | 270 ++++++++++++++++++ .../estimator/basic_estimator.go | 153 ---------- .../estimator/binpacking_estimator.go | 4 +- .../estimator/binpacking_estimator_test.go | 51 ++-- cluster-autoscaler/estimator/estimator.go | 19 +- .../estimator/old_binpacking_estimator.go | 70 +++++ ...st.go => old_binpacking_estimator_test.go} | 141 ++++----- cluster-autoscaler/main.go | 1 + .../processors/pods/pod_list_processor.go | 6 +- .../pods/pod_list_processor_test.go | 2 +- 14 files changed, 482 insertions(+), 405 deletions(-) delete mode 100644 cluster-autoscaler/estimator/basic_estimator.go create mode 100644 cluster-autoscaler/estimator/old_binpacking_estimator.go rename cluster-autoscaler/estimator/{basic_estimator_test.go => old_binpacking_estimator_test.go} (56%) diff --git a/cluster-autoscaler/core/filter_out_schedulable.go b/cluster-autoscaler/core/filter_out_schedulable.go index aa6dbc4e5d..99eff03d03 100644 --- a/cluster-autoscaler/core/filter_out_schedulable.go +++ b/cluster-autoscaler/core/filter_out_schedulable.go @@ -21,6 +21,7 @@ import ( "time" "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/processors/pods" "k8s.io/autoscaler/cluster-autoscaler/simulator" @@ -43,7 +44,8 @@ func NewFilterOutSchedulablePodListProcessor() pods.PodListProcessor { func (filterOutSchedulablePodListProcessor) Process( context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, allScheduledPods []*apiv1.Pod, - allNodes []*apiv1.Node, readyNodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) { + allNodes []*apiv1.Node, readyNodes []*apiv1.Node, + upcomingNodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) { // We need to check whether pods marked as unschedulable are actually unschedulable. // It's likely we added a new node and the scheduler just haven't managed to put the // pod on in yet. In this situation we don't want to trigger another scale-up. @@ -63,11 +65,19 @@ func (filterOutSchedulablePodListProcessor) Process( klog.V(4).Infof("Filtering out schedulables") filterOutSchedulableStart := time.Now() var unschedulablePodsToHelp []*apiv1.Pod - if context.FilterOutSchedulablePodsUsesPacking { - unschedulablePodsToHelp = filterOutSchedulableByPacking(unschedulablePods, readyNodes, allScheduledPods, - context.PredicateChecker, context.ExpendablePodsPriorityCutoff) + + if context.EstimatorName == estimator.BinpackingEstimatorName { + unschedulablePodsToHelp = filterOutSchedulableByPacking(unschedulablePods, upcomingNodes, allScheduledPods, + context.PredicateChecker, context.ExpendablePodsPriorityCutoff, false) } else { - unschedulablePodsToHelp = filterOutSchedulableSimple(unschedulablePods, readyNodes, allScheduledPods, + unschedulablePodsToHelp = unschedulablePods + } + + if context.FilterOutSchedulablePodsUsesPacking { + unschedulablePodsToHelp = filterOutSchedulableByPacking(unschedulablePodsToHelp, readyNodes, allScheduledPods, + context.PredicateChecker, context.ExpendablePodsPriorityCutoff, true) + } else { + unschedulablePodsToHelp = filterOutSchedulableSimple(unschedulablePodsToHelp, readyNodes, allScheduledPods, context.PredicateChecker, context.ExpendablePodsPriorityCutoff) } @@ -85,15 +95,16 @@ func (filterOutSchedulablePodListProcessor) Process( func (filterOutSchedulablePodListProcessor) CleanUp() { } -// filterOutSchedulableByPacking checks whether pods from marked as unschedulable -// can be scheduled on free capacity on existing nodes by trying to pack the pods. It tries to pack the higher priority -// pods first. It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption. -func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, - predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod { +// filterOutSchedulableByPacking checks whether pods from marked as +// unschedulable can be scheduled on free capacity on existing nodes by trying to pack the pods. It +// tries to pack the higher priority pods first. It takes into account pods that are bound to node +// and will be scheduled after lower priority pod preemption. +func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, + allScheduled []*apiv1.Pod, predicateChecker *simulator.PredicateChecker, + expendablePodsPriorityCutoff int, nodesExist bool) []*apiv1.Pod { var unschedulablePods []*apiv1.Pod nonExpendableScheduled := filterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff) nodeNameToNodeInfo := schedulerutil.CreateNodeNameToInfoMap(nonExpendableScheduled, nodes) - loggingQuota := glogx.PodsLoggingQuota() sort.Slice(unschedulableCandidates, func(i, j int) bool { return util.GetPodPriority(unschedulableCandidates[i]) > util.GetPodPriority(unschedulableCandidates[j]) @@ -104,12 +115,20 @@ func filterOutSchedulableByPacking(unschedulableCandidates []*apiv1.Pod, nodes [ if err != nil { unschedulablePods = append(unschedulablePods, pod) } else { - glogx.V(4).UpTo(loggingQuota).Infof("Pod %s marked as unschedulable can be scheduled on %s. Ignoring in scale up.", pod.Name, nodeName) + var nodeType string + if nodesExist { + nodeType = "existing" + } else { + nodeType = "upcoming" + } + klog.V(4).Infof("Pod %s marked as unschedulable can be scheduled on %s node %s. Ignoring"+ + " in scale up.", pod.Name, nodeType, nodeName) nodeNameToNodeInfo[nodeName] = schedulerutil.NodeWithPod(nodeNameToNodeInfo[nodeName], pod) } } - glogx.V(4).Over(loggingQuota).Infof("%v other pods marked as unschedulable can be scheduled.", -loggingQuota.Left()) + klog.V(4).Infof("%v other pods marked as unschedulable can be scheduled.", + len(unschedulableCandidates)-len(unschedulablePods)) return unschedulablePods } diff --git a/cluster-autoscaler/core/filter_out_schedulable_test.go b/cluster-autoscaler/core/filter_out_schedulable_test.go index b1ef1c6624..86a6a5df9f 100644 --- a/cluster-autoscaler/core/filter_out_schedulable_test.go +++ b/cluster-autoscaler/core/filter_out_schedulable_test.go @@ -82,20 +82,20 @@ func TestFilterOutSchedulableByPacking(t *testing.T) { predicateChecker := simulator.NewTestPredicateChecker() - res := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, predicateChecker, 10) + res := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, predicateChecker, 10, true) assert.Equal(t, 3, len(res)) assert.Equal(t, p2_1, res[0]) assert.Equal(t, p2_2, res[1]) assert.Equal(t, p3_2, res[2]) - res2 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, predicateChecker, 10) + res2 := filterOutSchedulableByPacking(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, predicateChecker, 10, true) assert.Equal(t, 4, len(res2)) assert.Equal(t, p1, res2[0]) assert.Equal(t, p2_1, res2[1]) assert.Equal(t, p2_2, res2[2]) assert.Equal(t, p3_2, res2[3]) - res4 := filterOutSchedulableByPacking(append(unschedulablePods, p4), []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, predicateChecker, 10) + res4 := filterOutSchedulableByPacking(append(unschedulablePods, p4), []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, predicateChecker, 10, true) assert.Equal(t, 5, len(res4)) assert.Equal(t, p1, res4[0]) assert.Equal(t, p2_1, res4[1]) diff --git a/cluster-autoscaler/core/scale_up_test.go b/cluster-autoscaler/core/scale_up_test.go index c7e3d7ab0c..548258ff7f 100644 --- a/cluster-autoscaler/core/scale_up_test.go +++ b/cluster-autoscaler/core/scale_up_test.go @@ -468,106 +468,6 @@ func buildTestPod(p podConfig) *apiv1.Pod { return pod } -func TestScaleUpNodeComingNoScale(t *testing.T) { - n1 := BuildTestNode("n1", 100, 1000) - SetNodeReadyState(n1, true, time.Now()) - n2 := BuildTestNode("n2", 1000, 1000) - SetNodeReadyState(n2, true, time.Now()) - - p1 := BuildTestPod("p1", 80, 0) - p2 := BuildTestPod("p2", 800, 0) - p1.Spec.NodeName = "n1" - p2.Spec.NodeName = "n2" - - podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2}) - listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) - - provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error { - t.Fatalf("No expansion is expected, but increased %s by %d", nodeGroup, increase) - return nil - }, nil) - provider.AddNodeGroup("ng1", 1, 10, 1) - provider.AddNodeGroup("ng2", 1, 10, 2) - provider.AddNode("ng1", n1) - provider.AddNode("ng2", n2) - - options := config.AutoscalingOptions{ - EstimatorName: estimator.BinpackingEstimatorName, - MaxCoresTotal: config.DefaultMaxClusterCores, - MaxMemoryTotal: config.DefaultMaxClusterMemory, - } - context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil) - - nodes := []*apiv1.Node{n1, n2} - nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker, nil) - clusterState := clusterstate.NewClusterStateRegistry( - provider, - clusterstate.ClusterStateRegistryConfig{MaxNodeProvisionTime: 5 * time.Minute}, - context.LogRecorder, - newBackoff()) - clusterState.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng2"), 1, time.Now()) - clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) - - p3 := BuildTestPod("p-new", 550, 0) - - processors := NewTestProcessors() - - scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) - assert.NoError(t, err) - // A node is already coming - no need for scale up. - assert.False(t, scaleUpStatus.WasSuccessful()) -} - -func TestScaleUpNodeComingHasScale(t *testing.T) { - n1 := BuildTestNode("n1", 100, 1000) - SetNodeReadyState(n1, true, time.Now()) - n2 := BuildTestNode("n2", 1000, 1000) - SetNodeReadyState(n2, true, time.Now()) - - p1 := BuildTestPod("p1", 80, 0) - p2 := BuildTestPod("p2", 800, 0) - p1.Spec.NodeName = "n1" - p2.Spec.NodeName = "n2" - - podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2}) - listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) - - expandedGroups := make(chan string, 10) - provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error { - expandedGroups <- fmt.Sprintf("%s-%d", nodeGroup, increase) - return nil - }, nil) - provider.AddNodeGroup("ng1", 1, 10, 1) - provider.AddNodeGroup("ng2", 1, 10, 2) - provider.AddNode("ng1", n1) - provider.AddNode("ng2", n2) - - context := NewScaleTestAutoscalingContext(defaultOptions, &fake.Clientset{}, listers, provider, nil) - - nodes := []*apiv1.Node{n1, n2} - nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker, nil) - clusterState := clusterstate.NewClusterStateRegistry( - provider, - clusterstate.ClusterStateRegistryConfig{ - MaxNodeProvisionTime: 5 * time.Minute, - }, - context.LogRecorder, - newBackoff()) - clusterState.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng2"), 1, time.Now()) - clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) - - p3 := BuildTestPod("p-new", 550, 0) - p4 := BuildTestPod("p-new", 550, 0) - - processors := NewTestProcessors() - scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3, p4}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) - - assert.NoError(t, err) - // Two nodes needed but one node is already coming, so it should increase by one. - assert.True(t, scaleUpStatus.WasSuccessful()) - assert.Equal(t, "ng2-1", getStringFromChan(expandedGroups)) -} - func TestScaleUpUnhealthy(t *testing.T) { n1 := BuildTestNode("n1", 100, 1000) SetNodeReadyState(n1, true, time.Now()) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index cbf3a42362..fc9082df7b 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -316,7 +316,9 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError // we tread pods with nominated node-name as scheduled for sake of scale-up considerations scheduledPods = append(scheduledPods, unschedulableWaitingForLowerPriorityPreemption...) - unschedulablePodsToHelp, scheduledPods, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods, scheduledPods, allNodes, readyNodes) + unschedulablePodsToHelp, scheduledPods, err := a.processors.PodListProcessor.Process( + a.AutoscalingContext, unschedulablePods, scheduledPods, allNodes, readyNodes, + getUpcomingNodeInfos(a.clusterStateRegistry, nodeInfosForGroups)) // finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable) unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime) @@ -588,3 +590,18 @@ func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time) bool { found, oldest := getOldestCreateTimeWithGpu(pods) return found && oldest.Add(unschedulablePodWithGpuTimeBuffer).After(currentTime) } + +func getUpcomingNodeInfos(registry *clusterstate.ClusterStateRegistry, nodeInfos map[string]*schedulernodeinfo.NodeInfo) []*apiv1.Node { + upcomingNodes := make([]*apiv1.Node, 0) + for nodeGroup, numberOfNodes := range registry.GetUpcomingNodes() { + nodeTemplate, found := nodeInfos[nodeGroup] + if !found { + klog.Warningf("Couldn't find template for node group %s", nodeGroup) + continue + } + for i := 0; i < numberOfNodes; i++ { + upcomingNodes = append(upcomingNodes, nodeTemplate.Node()) + } + } + return upcomingNodes +} diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 2c798f2953..46384c84e0 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -817,6 +817,276 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) } +func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) { + readyNodeLister := kubernetes.NewTestNodeLister(nil) + allNodeLister := kubernetes.NewTestNodeLister(nil) + scheduledPodMock := &podListerMock{} + unschedulablePodMock := &podListerMock{} + podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{} + daemonSetListerMock := &daemonSetListerMock{} + onScaleUpMock := &onScaleUpMock{} + onScaleDownMock := &onScaleDownMock{} + + n1 := BuildTestNode("n1", 2000, 1000) + SetNodeReadyState(n1, true, time.Now()) + n2 := BuildTestNode("n2", 2000, 1000) + SetNodeReadyState(n2, true, time.Now()) + + // shared owner reference + ownerRef := GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", "") + + p1 := BuildTestPod("p1", 1400, 0) + p1.OwnerReferences = ownerRef + p3 := BuildTestPod("p3", 1400, 0) + p3.Spec.NodeName = "n1" + p3.OwnerReferences = ownerRef + p4 := BuildTestPod("p4", 1400, 0) + p4.Spec.NodeName = "n2" + p4.OwnerReferences = ownerRef + + provider := testprovider.NewTestCloudProvider( + func(id string, delta int) error { + return onScaleUpMock.ScaleUp(id, delta) + }, func(id string, name string) error { + return onScaleDownMock.ScaleDown(id, name) + }) + provider.AddNodeGroup("ng1", 0, 10, 2) + provider.AddNode("ng1", n1) + + provider.AddNodeGroup("ng2", 0, 10, 1) + provider.AddNode("ng2", n2) + + assert.NotNil(t, provider) + + // Create context with mocked lister registry. + options := config.AutoscalingOptions{ + EstimatorName: estimator.BinpackingEstimatorName, + ScaleDownEnabled: false, + ScaleDownUtilizationThreshold: 0.5, + MaxNodesTotal: 10, + MaxCoresTotal: 10, + MaxMemoryTotal: 100000, + ExpendablePodsPriorityCutoff: 10, + } + processorCallbacks := newStaticAutoscalerProcessorCallbacks() + context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks) + listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, + unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, + nil, nil, nil, nil) + context.ListerRegistry = listerRegistry + + clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ + OkTotalUnreadyCount: 1, + MaxNodeProvisionTime: 10 * time.Second, + } + + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, newBackoff()) + sd := NewScaleDown(&context, clusterState) + + autoscaler := &StaticAutoscaler{ + AutoscalingContext: &context, + clusterStateRegistry: clusterState, + lastScaleUpTime: time.Now(), + lastScaleDownFailTime: time.Now(), + scaleDown: sd, + processors: NewTestProcessors(), + processorCallbacks: processorCallbacks, + } + + // Scale up + readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + scheduledPodMock.On("List").Return([]*apiv1.Pod{p3, p4}, nil).Times(2) // 1 to get pods + 1 when building nodeInfo map + unschedulablePodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + + err := autoscaler.RunOnce(time.Now()) + assert.NoError(t, err) + + mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock, + podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) +} + +func TestStaticAutoscalerRunOnceWithFilteringOnOldEstimator(t *testing.T) { + readyNodeLister := kubernetes.NewTestNodeLister(nil) + allNodeLister := kubernetes.NewTestNodeLister(nil) + scheduledPodMock := &podListerMock{} + unschedulablePodMock := &podListerMock{} + podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{} + daemonSetListerMock := &daemonSetListerMock{} + onScaleUpMock := &onScaleUpMock{} + onScaleDownMock := &onScaleDownMock{} + + n1 := BuildTestNode("n1", 2000, 1000) + SetNodeReadyState(n1, true, time.Now()) + n2 := BuildTestNode("n2", 2000, 1000) + SetNodeReadyState(n2, true, time.Now()) + + // shared owner reference + ownerRef := GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", "") + + p1 := BuildTestPod("p1", 1400, 0) + p1.OwnerReferences = ownerRef + p3 := BuildTestPod("p3", 1400, 0) + p3.Spec.NodeName = "n1" + p3.OwnerReferences = ownerRef + p4 := BuildTestPod("p4", 1400, 0) + p4.Spec.NodeName = "n2" + p4.OwnerReferences = ownerRef + + provider := testprovider.NewTestCloudProvider( + func(id string, delta int) error { + return onScaleUpMock.ScaleUp(id, delta) + }, func(id string, name string) error { + return onScaleDownMock.ScaleDown(id, name) + }) + provider.AddNodeGroup("ng1", 0, 10, 2) + provider.AddNode("ng1", n1) + + provider.AddNodeGroup("ng2", 0, 10, 1) + provider.AddNode("ng2", n2) + + assert.NotNil(t, provider) + + // Create context with mocked lister registry. + options := config.AutoscalingOptions{ + EstimatorName: estimator.OldBinpackingEstimatorName, + ScaleDownEnabled: false, + ScaleDownUtilizationThreshold: 0.5, + MaxNodesTotal: 10, + MaxCoresTotal: 10, + MaxMemoryTotal: 100000, + ExpendablePodsPriorityCutoff: 10, + } + processorCallbacks := newStaticAutoscalerProcessorCallbacks() + context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks) + listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, + unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, + nil, nil, nil, nil) + context.ListerRegistry = listerRegistry + + clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ + OkTotalUnreadyCount: 1, + MaxNodeProvisionTime: 10 * time.Second, + } + + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, newBackoff()) + sd := NewScaleDown(&context, clusterState) + + autoscaler := &StaticAutoscaler{ + AutoscalingContext: &context, + clusterStateRegistry: clusterState, + lastScaleUpTime: time.Now(), + lastScaleDownFailTime: time.Now(), + scaleDown: sd, + processors: NewTestProcessors(), + processorCallbacks: processorCallbacks, + } + + // Scale up + readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + scheduledPodMock.On("List").Return([]*apiv1.Pod{p3, p4}, nil).Times(2) // 1 to get pods + 1 when building nodeInfo map + unschedulablePodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + + err := autoscaler.RunOnce(time.Now()) + assert.NoError(t, err) + + mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock, + podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) +} + +func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *testing.T) { + readyNodeLister := kubernetes.NewTestNodeLister(nil) + allNodeLister := kubernetes.NewTestNodeLister(nil) + scheduledPodMock := &podListerMock{} + unschedulablePodMock := &podListerMock{} + podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{} + daemonSetListerMock := &daemonSetListerMock{} + onScaleUpMock := &onScaleUpMock{} + onScaleDownMock := &onScaleDownMock{} + + n2 := BuildTestNode("n2", 2000, 1000) + SetNodeReadyState(n2, true, time.Now()) + n3 := BuildTestNode("n3", 2000, 1000) + SetNodeReadyState(n3, true, time.Now()) + + // shared owner reference + ownerRef := GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", "") + + p1 := BuildTestPod("p1", 1400, 0) + p1.OwnerReferences = ownerRef + p2 := BuildTestPod("p2", 1400, 0) + p2.Spec.NodeName = "n2" + p2.OwnerReferences = ownerRef + p3 := BuildTestPod("p3", 1400, 0) + p3.Spec.NodeName = "n3" + p3.OwnerReferences = ownerRef + + provider := testprovider.NewTestCloudProvider( + func(id string, delta int) error { + return onScaleUpMock.ScaleUp(id, delta) + }, func(id string, name string) error { + return onScaleDownMock.ScaleDown(id, name) + }) + provider.AddNodeGroup("ng1", 0, 10, 2) + provider.AddNode("ng1", n2) + + provider.AddNodeGroup("ng2", 0, 10, 1) + provider.AddNode("ng2", n3) + + assert.NotNil(t, provider) + + // Create context with mocked lister registry. + options := config.AutoscalingOptions{ + EstimatorName: estimator.BinpackingEstimatorName, + ScaleDownEnabled: false, + ScaleDownUtilizationThreshold: 0.5, + MaxNodesTotal: 10, + MaxCoresTotal: 10, + MaxMemoryTotal: 100000, + ExpendablePodsPriorityCutoff: 10, + } + processorCallbacks := newStaticAutoscalerProcessorCallbacks() + context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks) + listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, + unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, + nil, nil, nil, nil) + context.ListerRegistry = listerRegistry + + clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ + OkTotalUnreadyCount: 1, + MaxNodeProvisionTime: 10 * time.Second, + } + + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, newBackoff()) + sd := NewScaleDown(&context, clusterState) + + autoscaler := &StaticAutoscaler{ + AutoscalingContext: &context, + clusterStateRegistry: clusterState, + lastScaleUpTime: time.Now(), + lastScaleDownFailTime: time.Now(), + scaleDown: sd, + processors: NewTestProcessors(), + processorCallbacks: processorCallbacks, + } + + // Scale up + readyNodeLister.SetNodes([]*apiv1.Node{n2, n3}) + allNodeLister.SetNodes([]*apiv1.Node{n2, n3}) + scheduledPodMock.On("List").Return([]*apiv1.Pod{p2, p3}, nil).Times(2) // 1 to get pods + 1 when building nodeInfo map + unschedulablePodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + + err := autoscaler.RunOnce(time.Now()) + assert.NoError(t, err) + + mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock, + podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) +} + func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) { // setup diff --git a/cluster-autoscaler/estimator/basic_estimator.go b/cluster-autoscaler/estimator/basic_estimator.go deleted file mode 100644 index 658e93c9f1..0000000000 --- a/cluster-autoscaler/estimator/basic_estimator.go +++ /dev/null @@ -1,153 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package estimator - -import ( - "bytes" - "fmt" - "math" - - apiv1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/klog" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" -) - -const basicEstimatorDeprecationMessage = "WARNING: basic estimator is deprecated. It will be removed in Cluster Autoscaler 1.5." - -// BasicNodeEstimator estimates the number of needed nodes to handle the given amount of pods. -// It will never overestimate the number of nodes but is quite likely to provide a number that -// is too small. -// -// Deprecated. -// TODO(aleksandra-malinowska): remove this in 1.5. -type BasicNodeEstimator struct { - cpuSum resource.Quantity - memorySum resource.Quantity - portSum map[int32]int - FittingPods map[*apiv1.Pod]struct{} -} - -// NewBasicNodeEstimator builds BasicNodeEstimator. -func NewBasicNodeEstimator() *BasicNodeEstimator { - klog.Warning(basicEstimatorDeprecationMessage) - return &BasicNodeEstimator{ - portSum: make(map[int32]int), - FittingPods: make(map[*apiv1.Pod]struct{}), - } -} - -// Add adds Pod to the estimation. -func (basicEstimator *BasicNodeEstimator) Add(pod *apiv1.Pod) error { - ports := make(map[int32]struct{}) - for _, container := range pod.Spec.Containers { - if request, ok := container.Resources.Requests[apiv1.ResourceCPU]; ok { - basicEstimator.cpuSum.Add(request) - } - if request, ok := container.Resources.Requests[apiv1.ResourceMemory]; ok { - basicEstimator.memorySum.Add(request) - } - for _, port := range container.Ports { - if port.HostPort > 0 { - ports[port.HostPort] = struct{}{} - } - } - } - for port := range ports { - if sum, ok := basicEstimator.portSum[port]; ok { - basicEstimator.portSum[port] = sum + 1 - } else { - basicEstimator.portSum[port] = 1 - } - } - basicEstimator.FittingPods[pod] = struct{}{} - return nil -} - -func maxInt(a, b int) int { - if a > b { - return a - } - return b -} - -// GetDebug returns debug information about the current state of BasicNodeEstimator -func (basicEstimator *BasicNodeEstimator) GetDebug() string { - var buffer bytes.Buffer - buffer.WriteString("Resources needed:\n") - buffer.WriteString(fmt.Sprintf("CPU: %s\n", basicEstimator.cpuSum.String())) - buffer.WriteString(fmt.Sprintf("Mem: %s\n", basicEstimator.memorySum.String())) - for port, count := range basicEstimator.portSum { - buffer.WriteString(fmt.Sprintf("Port %d: %d\n", port, count)) - } - return buffer.String() -} - -// Estimate estimates the number needed of nodes of the given shape. -func (basicEstimator *BasicNodeEstimator) Estimate(pods []*apiv1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, upcomingNodes []*schedulernodeinfo.NodeInfo) int { - for _, pod := range pods { - basicEstimator.Add(pod) - } - - result := 0 - resources := apiv1.ResourceList{} - for _, node := range upcomingNodes { - cpu := resources[apiv1.ResourceCPU] - cpu.Add(node.Node().Status.Capacity[apiv1.ResourceCPU]) - resources[apiv1.ResourceCPU] = cpu - - mem := resources[apiv1.ResourceMemory] - mem.Add(node.Node().Status.Capacity[apiv1.ResourceMemory]) - resources[apiv1.ResourceMemory] = mem - - pods := resources[apiv1.ResourcePods] - pods.Add(node.Node().Status.Capacity[apiv1.ResourcePods]) - resources[apiv1.ResourcePods] = pods - } - - node := nodeInfo.Node() - if cpuCapacity, ok := node.Status.Capacity[apiv1.ResourceCPU]; ok { - comingCpu := resources[apiv1.ResourceCPU] - prop := int(math.Ceil(float64( - basicEstimator.cpuSum.MilliValue()-comingCpu.MilliValue()) / - float64(cpuCapacity.MilliValue()))) - - result = maxInt(result, prop) - } - if memCapacity, ok := node.Status.Capacity[apiv1.ResourceMemory]; ok { - comingMem := resources[apiv1.ResourceMemory] - prop := int(math.Ceil(float64( - basicEstimator.memorySum.Value()-comingMem.Value()) / - float64(memCapacity.Value()))) - result = maxInt(result, prop) - } - if podCapacity, ok := node.Status.Capacity[apiv1.ResourcePods]; ok { - comingPods := resources[apiv1.ResourcePods] - prop := int(math.Ceil(float64(basicEstimator.GetCount()-int(comingPods.Value())) / - float64(podCapacity.Value()))) - result = maxInt(result, prop) - } - for _, count := range basicEstimator.portSum { - result = maxInt(result, count-len(upcomingNodes)) - } - return result -} - -// GetCount returns number of pods included in the estimation. -func (basicEstimator *BasicNodeEstimator) GetCount() int { - return len(basicEstimator.FittingPods) -} diff --git a/cluster-autoscaler/estimator/binpacking_estimator.go b/cluster-autoscaler/estimator/binpacking_estimator.go index fbc8792f3c..edf1cde329 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator.go +++ b/cluster-autoscaler/estimator/binpacking_estimator.go @@ -53,12 +53,10 @@ func NewBinpackingNodeEstimator(predicateChecker *simulator.PredicateChecker) *B // Returns the number of nodes needed to accommodate all pods from the list. func (estimator *BinpackingNodeEstimator) Estimate(pods []*apiv1.Pod, nodeTemplate *schedulernodeinfo.NodeInfo, upcomingNodes []*schedulernodeinfo.NodeInfo) int { - podInfos := calculatePodScore(pods, nodeTemplate) sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score }) newNodes := make([]*schedulernodeinfo.NodeInfo, 0) - newNodes = append(newNodes, upcomingNodes...) for _, podInfo := range podInfos { found := false @@ -73,7 +71,7 @@ func (estimator *BinpackingNodeEstimator) Estimate(pods []*apiv1.Pod, nodeTempla newNodes = append(newNodes, schedulerUtils.NodeWithPod(nodeTemplate, podInfo.pod)) } } - return len(newNodes) - len(upcomingNodes) + return len(newNodes) } // Calculates score for all pods and returns podInfo structure. diff --git a/cluster-autoscaler/estimator/binpacking_estimator_test.go b/cluster-autoscaler/estimator/binpacking_estimator_test.go index 0482d6db8d..96967045a9 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator_test.go +++ b/cluster-autoscaler/estimator/binpacking_estimator_test.go @@ -30,6 +30,23 @@ import ( "github.com/stretchr/testify/assert" ) +func makePod(cpuPerPod, memoryPerPod int64) *apiv1.Pod { + return &apiv1.Pod{ + Spec: apiv1.PodSpec{ + Containers: []apiv1.Container{ + { + Resources: apiv1.ResourceRequirements{ + Requests: apiv1.ResourceList{ + apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod, resource.DecimalSI), + apiv1.ResourceMemory: *resource.NewQuantity(memoryPerPod, resource.DecimalSI), + }, + }, + }, + }, + }, + } +} + func TestBinpackingEstimate(t *testing.T) { estimator := NewBinpackingNodeEstimator(simulator.NewTestPredicateChecker()) @@ -55,40 +72,10 @@ func TestBinpackingEstimate(t *testing.T) { nodeInfo := schedulernodeinfo.NewNodeInfo() nodeInfo.SetNode(node) - estimate := estimator.Estimate(pods, nodeInfo, []*schedulernodeinfo.NodeInfo{}) + estimate := estimator.Estimate(pods, nodeInfo, nil) assert.Equal(t, 5, estimate) } -func TestBinpackingEstimateComingNodes(t *testing.T) { - estimator := NewBinpackingNodeEstimator(simulator.NewTestPredicateChecker()) - - cpuPerPod := int64(350) - memoryPerPod := int64(1000 * units.MiB) - pod := makePod(cpuPerPod, memoryPerPod) - - pods := make([]*apiv1.Pod, 0) - for i := 0; i < 10; i++ { - pods = append(pods, pod) - } - node := &apiv1.Node{ - Status: apiv1.NodeStatus{ - Capacity: apiv1.ResourceList{ - apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod*3-50, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(2*memoryPerPod, resource.DecimalSI), - apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), - }, - }, - } - node.Status.Allocatable = node.Status.Capacity - SetNodeReadyState(node, true, time.Time{}) - - nodeInfo := schedulernodeinfo.NewNodeInfo() - nodeInfo.SetNode(node) - estimate := estimator.Estimate(pods, nodeInfo, []*schedulernodeinfo.NodeInfo{nodeInfo, nodeInfo}) - // 5 - 2 nodes that are coming. - assert.Equal(t, 3, estimate) -} - func TestBinpackingEstimateWithPorts(t *testing.T) { estimator := NewBinpackingNodeEstimator(simulator.NewTestPredicateChecker()) @@ -118,6 +105,6 @@ func TestBinpackingEstimateWithPorts(t *testing.T) { nodeInfo := schedulernodeinfo.NewNodeInfo() nodeInfo.SetNode(node) - estimate := estimator.Estimate(pods, nodeInfo, []*schedulernodeinfo.NodeInfo{}) + estimate := estimator.Estimate(pods, nodeInfo, nil) assert.Equal(t, 8, estimate) } diff --git a/cluster-autoscaler/estimator/estimator.go b/cluster-autoscaler/estimator/estimator.go index c137a4f585..9d8a24532f 100644 --- a/cluster-autoscaler/estimator/estimator.go +++ b/cluster-autoscaler/estimator/estimator.go @@ -26,18 +26,14 @@ import ( ) const ( - //BasicEstimatorName is the name of basic estimator. - BasicEstimatorName = "basic" // BinpackingEstimatorName is the name of binpacking estimator. BinpackingEstimatorName = "binpacking" + // OldBinpackingEstimatorName is the name of the older binpacking estimator. + OldBinpackingEstimatorName = "oldbinpacking" ) -func deprecated(name string) string { - return fmt.Sprintf("%s (DEPRECATED)", name) -} - // AvailableEstimators is a list of available estimators. -var AvailableEstimators = []string{BinpackingEstimatorName, deprecated(BasicEstimatorName)} +var AvailableEstimators = []string{BinpackingEstimatorName, OldBinpackingEstimatorName} // Estimator calculates the number of nodes of given type needed to schedule pods. type Estimator interface { @@ -55,11 +51,10 @@ func NewEstimatorBuilder(name string) (EstimatorBuilder, error) { return NewBinpackingNodeEstimator(predicateChecker) }, nil // Deprecated. - // TODO(aleksandra-malinowska): remove in 1.5. - case BasicEstimatorName: - klog.Warning(basicEstimatorDeprecationMessage) - return func(_ *simulator.PredicateChecker) Estimator { - return NewBasicNodeEstimator() + case OldBinpackingEstimatorName: + klog.Warning(oldBinPackingEstimatorDeprecationMessage) + return func(predicateChecker *simulator.PredicateChecker) Estimator { + return NewOldBinpackingNodeEstimator(predicateChecker) }, nil } return nil, fmt.Errorf("unknown estimator: %s", name) diff --git a/cluster-autoscaler/estimator/old_binpacking_estimator.go b/cluster-autoscaler/estimator/old_binpacking_estimator.go new file mode 100644 index 0000000000..d5bdc8f2cf --- /dev/null +++ b/cluster-autoscaler/estimator/old_binpacking_estimator.go @@ -0,0 +1,70 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "sort" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/simulator" + schedulerUtils "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" +) + +const oldBinPackingEstimatorDeprecationMessage = "old binpacking estimator is deprecated. It will be removed in Cluster Autoscaler 1.15." + +// OldBinpackingNodeEstimator uses the same bin packing logic as in BinPackingEstimator, but, also +// packs in upcoming nodes +// +// Deprecated: +// TODO(vivekbagade): Remove in 1.15 +type OldBinpackingNodeEstimator struct { + predicateChecker *simulator.PredicateChecker +} + +// NewOldBinpackingNodeEstimator builds a new OldBinpackingNodeEstimator. +func NewOldBinpackingNodeEstimator(predicateChecker *simulator.PredicateChecker) *OldBinpackingNodeEstimator { + return &OldBinpackingNodeEstimator{ + predicateChecker: predicateChecker, + } +} + +// Estimate number of nodes needed using bin packing +func (estimator *OldBinpackingNodeEstimator) Estimate(pods []*apiv1.Pod, nodeTemplate *schedulernodeinfo.NodeInfo, + upcomingNodes []*schedulernodeinfo.NodeInfo) int { + + podInfos := calculatePodScore(pods, nodeTemplate) + sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score }) + + newNodes := make([]*schedulernodeinfo.NodeInfo, 0) + newNodes = append(newNodes, upcomingNodes...) + + for _, podInfo := range podInfos { + found := false + for i, nodeInfo := range newNodes { + if err := estimator.predicateChecker.CheckPredicates(podInfo.pod, nil, nodeInfo); err == nil { + found = true + newNodes[i] = schedulerUtils.NodeWithPod(nodeInfo, podInfo.pod) + break + } + } + if !found { + newNodes = append(newNodes, schedulerUtils.NodeWithPod(nodeTemplate, podInfo.pod)) + } + } + return len(newNodes) - len(upcomingNodes) +} diff --git a/cluster-autoscaler/estimator/basic_estimator_test.go b/cluster-autoscaler/estimator/old_binpacking_estimator_test.go similarity index 56% rename from cluster-autoscaler/estimator/basic_estimator_test.go rename to cluster-autoscaler/estimator/old_binpacking_estimator_test.go index 65f831e6c2..b8e03ad2dd 100644 --- a/cluster-autoscaler/estimator/basic_estimator_test.go +++ b/cluster-autoscaler/estimator/old_binpacking_estimator_test.go @@ -18,135 +18,106 @@ package estimator import ( "testing" + "time" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/autoscaler/cluster-autoscaler/simulator" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/autoscaler/cluster-autoscaler/utils/units" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" "github.com/stretchr/testify/assert" ) -func makePod(cpuPerPod, memoryPerPod int64) *apiv1.Pod { - return &apiv1.Pod{ - Spec: apiv1.PodSpec{ - Containers: []apiv1.Container{ - { - Resources: apiv1.ResourceRequirements{ - Requests: apiv1.ResourceList{ - apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(memoryPerPod, resource.DecimalSI), - }, - }, - }, - }, - }, - } -} +func TestOldBinpackingEstimate(t *testing.T) { + estimator := NewOldBinpackingNodeEstimator(simulator.NewTestPredicateChecker()) -func TestEstimate(t *testing.T) { - cpuPerPod := int64(500) + cpuPerPod := int64(350) memoryPerPod := int64(1000 * units.MiB) pod := makePod(cpuPerPod, memoryPerPod) - pods := []*apiv1.Pod{} - for i := 0; i < 5; i++ { - podCopy := *pod - pods = append(pods, &podCopy) + pods := make([]*apiv1.Pod, 0) + for i := 0; i < 10; i++ { + pods = append(pods, pod) } - node := &apiv1.Node{ Status: apiv1.NodeStatus{ Capacity: apiv1.ResourceList{ - apiv1.ResourceCPU: *resource.NewMilliQuantity(3*cpuPerPod, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(2*memoryPerPod, resource.DecimalSI), - apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), - }, - }, - } - nodeInfo := schedulernodeinfo.NewNodeInfo() - nodeInfo.SetNode(node) - - estimator := NewBasicNodeEstimator() - estimate := estimator.Estimate(pods, nodeInfo, []*schedulernodeinfo.NodeInfo{}) - - // Check result. - assert.Equal(t, 3, estimate) - - // Check internal state of estimator. - assert.Equal(t, int64(500*5), estimator.cpuSum.MilliValue()) - assert.Equal(t, int64(5*memoryPerPod), estimator.memorySum.Value()) - assert.Equal(t, 5, estimator.GetCount()) - assert.Contains(t, estimator.GetDebug(), "CPU") -} - -func TestEstimateWithComing(t *testing.T) { - cpuPerPod := int64(500) - memoryPerPod := int64(1000 * units.MiB) - - pod := makePod(cpuPerPod, memoryPerPod) - pods := []*apiv1.Pod{} - for i := 0; i < 5; i++ { - podCopy := *pod - pods = append(pods, &podCopy) - } - - node := &apiv1.Node{ - Status: apiv1.NodeStatus{ - Capacity: apiv1.ResourceList{ - apiv1.ResourceCPU: *resource.NewMilliQuantity(3*cpuPerPod, resource.DecimalSI), + apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod*3-50, resource.DecimalSI), apiv1.ResourceMemory: *resource.NewQuantity(2*memoryPerPod, resource.DecimalSI), apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), }, }, } node.Status.Allocatable = node.Status.Capacity + SetNodeReadyState(node, true, time.Time{}) + nodeInfo := schedulernodeinfo.NewNodeInfo() nodeInfo.SetNode(node) - - estimator := NewBasicNodeEstimator() - estimate := estimator.Estimate(pods, nodeInfo, []*schedulernodeinfo.NodeInfo{nodeInfo, nodeInfo}) - - // Check result. - assert.Equal(t, 1, estimate) - - // Check internal state of estimator. - assert.Contains(t, estimator.GetDebug(), "CPU") - assert.Equal(t, int64(500*5), estimator.cpuSum.MilliValue()) - assert.Equal(t, int64(5*memoryPerPod), estimator.memorySum.Value()) - assert.Equal(t, 5, estimator.GetCount()) - + estimate := estimator.Estimate(pods, nodeInfo, []*schedulernodeinfo.NodeInfo{}) + assert.Equal(t, 5, estimate) } -func TestEstimateWithPorts(t *testing.T) { - cpuPerPod := int64(500) - memoryPerPod := int64(1000 * units.MiB) +func TestOldBinpackingEstimateComingNodes(t *testing.T) { + estimator := NewOldBinpackingNodeEstimator(simulator.NewTestPredicateChecker()) + cpuPerPod := int64(350) + memoryPerPod := int64(1000 * units.MiB) + pod := makePod(cpuPerPod, memoryPerPod) + + pods := make([]*apiv1.Pod, 0) + for i := 0; i < 10; i++ { + pods = append(pods, pod) + } + node := &apiv1.Node{ + Status: apiv1.NodeStatus{ + Capacity: apiv1.ResourceList{ + apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod*3-50, resource.DecimalSI), + apiv1.ResourceMemory: *resource.NewQuantity(2*memoryPerPod, resource.DecimalSI), + apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, + }, + } + node.Status.Allocatable = node.Status.Capacity + SetNodeReadyState(node, true, time.Time{}) + + nodeInfo := schedulernodeinfo.NewNodeInfo() + nodeInfo.SetNode(node) + estimate := estimator.Estimate(pods, nodeInfo, []*schedulernodeinfo.NodeInfo{nodeInfo, nodeInfo}) + // 5 - 2 nodes that are coming. + assert.Equal(t, 3, estimate) +} + +func TestOldBinpackingEstimateWithPorts(t *testing.T) { + estimator := NewOldBinpackingNodeEstimator(simulator.NewTestPredicateChecker()) + + cpuPerPod := int64(200) + memoryPerPod := int64(1000 * units.MiB) pod := makePod(cpuPerPod, memoryPerPod) pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{ { HostPort: 5555, }, } - - pods := []*apiv1.Pod{} - for i := 0; i < 5; i++ { + pods := make([]*apiv1.Pod, 0) + for i := 0; i < 8; i++ { pods = append(pods, pod) } node := &apiv1.Node{ Status: apiv1.NodeStatus{ Capacity: apiv1.ResourceList{ - apiv1.ResourceCPU: *resource.NewMilliQuantity(3*cpuPerPod, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(2*memoryPerPod, resource.DecimalSI), + apiv1.ResourceCPU: *resource.NewMilliQuantity(5*cpuPerPod, resource.DecimalSI), + apiv1.ResourceMemory: *resource.NewQuantity(5*memoryPerPod, resource.DecimalSI), apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), }, }, } + node.Status.Allocatable = node.Status.Capacity + SetNodeReadyState(node, true, time.Time{}) + nodeInfo := schedulernodeinfo.NewNodeInfo() nodeInfo.SetNode(node) - - estimator := NewBasicNodeEstimator() estimate := estimator.Estimate(pods, nodeInfo, []*schedulernodeinfo.NodeInfo{}) - assert.Contains(t, estimator.GetDebug(), "CPU") - assert.Equal(t, 5, estimate) + assert.Equal(t, 8, estimate) } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index d0854ff6d2..cb8034b073 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -166,6 +166,7 @@ var ( "Filtering out schedulable pods before CA scale up by trying to pack the schedulable pods on free capacity on existing nodes."+ "Setting it to false employs a more lenient filtering approach that does not try to pack the pods on the nodes."+ "Pods with nominatedNodeName set are always filtered out.") + ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group") ) diff --git a/cluster-autoscaler/processors/pods/pod_list_processor.go b/cluster-autoscaler/processors/pods/pod_list_processor.go index 6800e26ee6..0d1334cc7d 100644 --- a/cluster-autoscaler/processors/pods/pod_list_processor.go +++ b/cluster-autoscaler/processors/pods/pod_list_processor.go @@ -25,7 +25,8 @@ import ( type PodListProcessor interface { Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, allScheduledPods []*apiv1.Pod, - allNodes []*apiv1.Node, readyNodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) + allNodes []*apiv1.Node, readyNodes []*apiv1.Node, + upcomingNodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) CleanUp() } @@ -41,7 +42,8 @@ func NewDefaultPodListProcessor() PodListProcessor { // Process processes lists of unschedulable and scheduled pods before scaling of the cluster. func (p *NoOpPodListProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, allScheduledPods []*apiv1.Pod, - allNodes []*apiv1.Node, readyNodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) { + allNodes []*apiv1.Node, readyNodes []*apiv1.Node, + upcomingNodes []*apiv1.Node) ([]*apiv1.Pod, []*apiv1.Pod, error) { return unschedulablePods, allScheduledPods, nil } diff --git a/cluster-autoscaler/processors/pods/pod_list_processor_test.go b/cluster-autoscaler/processors/pods/pod_list_processor_test.go index c1aa2d3195..291b181b90 100644 --- a/cluster-autoscaler/processors/pods/pod_list_processor_test.go +++ b/cluster-autoscaler/processors/pods/pod_list_processor_test.go @@ -36,7 +36,7 @@ func TestPodListProcessor(t *testing.T) { allNodes := []*apiv1.Node{n1, n2} readyNodes := []*apiv1.Node{n1, n2} podListProcessor := NewDefaultPodListProcessor() - gotUnschedulablePods, gotAllScheduled, err := podListProcessor.Process(context, unschedulablePods, allScheduledPods, allNodes, readyNodes) + gotUnschedulablePods, gotAllScheduled, err := podListProcessor.Process(context, unschedulablePods, allScheduledPods, allNodes, readyNodes, []*apiv1.Node{}) if len(gotUnschedulablePods) != 1 || len(gotAllScheduled) != 1 || err != nil { t.Errorf("Error podListProcessor.Process() = %v, %v, %v want %v, %v, nil ", gotUnschedulablePods, gotAllScheduled, err, unschedulablePods, allScheduledPods)