Merge pull request #411 from krzysztof-jastrzebski/priority

Adds priority preemption support to cluster autoscaler.
This commit is contained in:
Marcin Wielgus 2017-11-08 09:09:26 +01:00 committed by GitHub
commit 439fd3c9ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 516 additions and 37 deletions

View File

@ -131,6 +131,9 @@ type AutoscalingOptions struct {
NodeAutoprovisioningEnabled bool
// MaxAutoprovisionedNodeGroupCount is the maximum number of autoprovisioned groups in the cluster.
MaxAutoprovisionedNodeGroupCount int
// Pods with priority below cutoff are expendable. They can be killed without any consideration during scale down and they don't cause scale up.
// Pods with null priority (PodPriority disabled) are non expendable.
ExpendablePodsPriorityCutoff int
}
// NewAutoscalingContext returns an autoscaling context from all the necessary parameters passed via arguments

View File

@ -33,6 +33,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
@ -40,7 +41,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_client "k8s.io/client-go/kubernetes"
kube_record "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog"
)
@ -153,7 +153,9 @@ func (sd *ScaleDown) UpdateUnneededNodes(
pdbs []*policyv1.PodDisruptionBudget) errors.AutoscalerError {
currentlyUnneededNodes := make([]*apiv1.Node, 0)
nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods, nodes)
// Only scheduled non expendable pods and pods waiting for lower priority pods preemption can prevent node delete.
nonExpendablePods := FilterOutExpendablePods(pods, sd.context.ExpendablePodsPriorityCutoff)
nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(nonExpendablePods, nodes)
utilizationMap := make(map[string]float64)
sd.updateUnremovableNodes(nodes)
@ -232,7 +234,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
// Look for nodes to remove in the current candidates
nodesToRemove, unremovable, newHints, simulatorErr := simulator.FindNodesToRemove(
currentCandidates, nodes, pods, nil, sd.context.PredicateChecker,
currentCandidates, nodes, nonExpendablePods, nil, sd.context.PredicateChecker,
len(currentCandidates), true, sd.podLocationHints, sd.usageTracker, timestamp, pdbs)
if simulatorErr != nil {
return sd.markSimulationError(simulatorErr, timestamp)
@ -254,7 +256,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
// Look for addidtional nodes to remove among the rest of nodes
glog.V(3).Infof("Finding additional %v candidates for scale down.", additionalCandidatesCount)
additionalNodesToRemove, additionalUnremovable, additionalNewHints, simulatorErr :=
simulator.FindNodesToRemove(currentNonCandidates[:additionalCandidatesPoolSize], nodes, pods, nil,
simulator.FindNodesToRemove(currentNonCandidates[:additionalCandidatesPoolSize], nodes, nonExpendablePods, nil,
sd.context.PredicateChecker, additionalCandidatesCount, true,
sd.podLocationHints, sd.usageTracker, timestamp, pdbs)
if simulatorErr != nil {
@ -462,8 +464,10 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
}
findNodesToRemoveStart := time.Now()
// Only scheduled non expendable pods are taken into account and have to be moved.
nonExpendablePods := FilterOutExpendablePods(pods, sd.context.ExpendablePodsPriorityCutoff)
// We look for only 1 node so new hints may be incomplete.
nodesToRemove, _, _, err := simulator.FindNodesToRemove(candidates, nodesWithoutMaster, pods, sd.context.ClientSet,
nodesToRemove, _, _, err := simulator.FindNodesToRemove(candidates, nodesWithoutMaster, nonExpendablePods, sd.context.ClientSet,
sd.context.PredicateChecker, 1, false,
sd.podLocationHints, sd.usageTracker, time.Now(), pdbs)
findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart)

View File

@ -35,12 +35,14 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"strconv"
"github.com/golang/glog"
"github.com/stretchr/testify/assert"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
)
@ -72,18 +74,27 @@ func TestFindUnneededNodes(t *testing.T) {
p6.OwnerReferences = ownerRef
p6.Spec.NodeName = "n7"
// Node with not replicated pod.
n1 := BuildTestNode("n1", 1000, 10)
// Node can be deleted.
n2 := BuildTestNode("n2", 1000, 10)
// Node with high utilization.
n3 := BuildTestNode("n3", 1000, 10)
// Node with big pod.
n4 := BuildTestNode("n4", 10000, 10)
// No scale down node.
n5 := BuildTestNode("n5", 1000, 10)
n5.Annotations = map[string]string{
ScaleDownDisabledKey: "true",
}
// Node info not found.
n6 := BuildTestNode("n6", 1000, 10)
n7 := BuildTestNode("n7", 0, 10) // Node without utilization
// Node without utilization.
n7 := BuildTestNode("n7", 0, 10)
// Node being deleted.
n8 := BuildTestNode("n8", 1000, 10)
n8.Spec.Taints = []apiv1.Taint{{Key: deletetaint.ToBeDeletedTaint, Value: strconv.FormatInt(time.Now().Unix()-301, 10)}}
// Nod being deleted recently.
n9 := BuildTestNode("n9", 1000, 10)
n9.Spec.Taints = []apiv1.Taint{{Key: deletetaint.ToBeDeletedTaint, Value: strconv.FormatInt(time.Now().Unix()-60, 10)}}
@ -115,6 +126,7 @@ func TestFindUnneededNodes(t *testing.T) {
context := AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.35,
ExpendablePodsPriorityCutoff: 10,
},
ClusterStateRegistry: clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder),
PredicateChecker: simulator.NewTestPredicateChecker(),
@ -164,6 +176,98 @@ func TestFindUnneededNodes(t *testing.T) {
assert.Equal(t, 0, len(sd.unremovableNodes))
}
func TestPodsWithPrioritiesFindUnneededNodes(t *testing.T) {
// shared owner reference
ownerRef := GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", "")
var priority100 int32 = 100
var priority1 int32 = 1
p1 := BuildTestPod("p1", 600, 0)
p1.OwnerReferences = ownerRef
p1.Spec.Priority = &priority100
p1.Annotations = map[string]string{scheduler_util.NominatedNodeAnnotationKey: "n1"}
p2 := BuildTestPod("p2", 400, 0)
p2.OwnerReferences = ownerRef
p2.Spec.NodeName = "n2"
p2.Spec.Priority = &priority1
p3 := BuildTestPod("p3", 100, 0)
p3.OwnerReferences = ownerRef
p3.Spec.NodeName = "n2"
p3.Spec.Priority = &priority100
p4 := BuildTestPod("p4", 100, 0)
p4.OwnerReferences = ownerRef
p4.Spec.Priority = &priority100
p4.Annotations = map[string]string{scheduler_util.NominatedNodeAnnotationKey: "n2"}
p5 := BuildTestPod("p5", 400, 0)
p5.OwnerReferences = ownerRef
p5.Spec.NodeName = "n3"
p5.Spec.Priority = &priority1
p6 := BuildTestPod("p6", 400, 0)
p6.OwnerReferences = ownerRef
p6.Spec.NodeName = "n3"
p6.Spec.Priority = &priority1
p7 := BuildTestPod("p7", 1200, 0)
p7.OwnerReferences = ownerRef
p7.Spec.Priority = &priority100
p7.Annotations = map[string]string{scheduler_util.NominatedNodeAnnotationKey: "n4"}
// Node with pod waiting for lower priority pod preemption, highly utilized. Can't be deleted.
n1 := BuildTestNode("n1", 1000, 10)
// Node with big expendable pod and two small non expendable pods that can be moved.
n2 := BuildTestNode("n2", 1000, 10)
// Pod with two expendable pods.
n3 := BuildTestNode("n3", 1000, 10)
// Node with big pod waiting for lower priority pod preemption. Can't be deleted.
n4 := BuildTestNode("n4", 10000, 10)
SetNodeReadyState(n1, true, time.Time{})
SetNodeReadyState(n2, true, time.Time{})
SetNodeReadyState(n3, true, time.Time{})
SetNodeReadyState(n4, true, time.Time{})
fakeClient := &fake.Clientset{}
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 2)
provider.AddNode("ng1", n1)
provider.AddNode("ng1", n2)
provider.AddNode("ng1", n3)
provider.AddNode("ng1", n4)
context := AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.35,
ExpendablePodsPriorityCutoff: 10,
},
ClusterStateRegistry: clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder),
PredicateChecker: simulator.NewTestPredicateChecker(),
LogRecorder: fakeLogRecorder,
CloudProvider: provider,
}
sd := NewScaleDown(&context)
sd.UpdateUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, []*apiv1.Node{n1, n2, n3, n4},
[]*apiv1.Pod{p1, p2, p3, p4, p5, p6, p7}, time.Now(), nil)
assert.Equal(t, 2, len(sd.unneededNodes))
glog.Warningf("Unneeded nodes %v", sd.unneededNodes)
_, found := sd.unneededNodes["n2"]
assert.True(t, found)
_, found = sd.unneededNodes["n3"]
assert.True(t, found)
assert.Contains(t, sd.podLocationHints, p3.Namespace+"/"+p3.Name)
assert.Contains(t, sd.podLocationHints, p4.Namespace+"/"+p4.Name)
assert.Equal(t, 4, len(sd.nodeUtilizationMap))
}
func TestFindUnneededMaxCandidates(t *testing.T) {
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 100, 2)
@ -601,11 +705,17 @@ func TestScaleDown(t *testing.T) {
p1.OwnerReferences = GenerateOwnerReferences(job.Name, "Job", "extensions/v1beta1", "")
p2 := BuildTestPod("p2", 800, 0)
var priority int32 = 1
p2.Spec.Priority = &priority
p3 := BuildTestPod("p3", 800, 0)
p1.Spec.NodeName = "n1"
p2.Spec.NodeName = "n2"
p2.Spec.NodeName = "n1"
p3.Spec.NodeName = "n2"
fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, &apiv1.PodList{Items: []apiv1.Pod{*p1, *p2}}, nil
return true, &apiv1.PodList{Items: []apiv1.Pod{*p1, *p2, *p3}}, nil
})
fakeClient.Fake.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
@ -648,6 +758,7 @@ func TestScaleDown(t *testing.T) {
ScaleDownUtilizationThreshold: 0.5,
ScaleDownUnneededTime: time.Minute,
MaxGracefulTerminationSec: 60,
ExpendablePodsPriorityCutoff: 10,
},
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
@ -658,8 +769,8 @@ func TestScaleDown(t *testing.T) {
}
scaleDown := NewScaleDown(context)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute), nil)
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, nil, time.Now())
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2, p3}, time.Now().Add(-5*time.Minute), nil)
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2, p3}, nil, time.Now())
waitForDeleteToFinish(t, scaleDown)
assert.NoError(t, err)
assert.Equal(t, ScaleDownNodeDeleteStarted, result)

View File

@ -216,13 +216,17 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
// which is supposed to schedule on an existing node.
schedulablePodsPresent := false
// Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run.
// Such pods don't require scale up but should be considered during scale down.
unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := FilterOutExpendableAndSplit(allUnschedulablePods, a.ExpendablePodsPriorityCutoff)
glog.V(4).Infof("Filtering out schedulables")
filterOutSchedulableStart := time.Now()
unschedulablePodsToHelp := FilterOutSchedulable(allUnschedulablePods, readyNodes, allScheduled,
a.PredicateChecker)
unschedulablePodsToHelp := FilterOutSchedulable(unschedulablePods, readyNodes, allScheduled,
unschedulableWaitingForLowerPriorityPreemption, a.PredicateChecker, a.ExpendablePodsPriorityCutoff)
metrics.UpdateDurationFromStart(metrics.FilterOutSchedulable, filterOutSchedulableStart)
if len(unschedulablePodsToHelp) != len(allUnschedulablePods) {
if len(unschedulablePodsToHelp) != len(unschedulablePods) {
glog.V(2).Info("Schedulable pods present")
schedulablePodsPresent = true
} else {
@ -271,7 +275,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleDown.CleanUp(currentTime)
potentiallyUnneeded := getPotentiallyUnneededNodes(autoscalingContext, allNodes)
typedErr := scaleDown.UpdateUnneededNodes(allNodes, potentiallyUnneeded, allScheduled, currentTime, pdbs)
typedErr := scaleDown.UpdateUnneededNodes(allNodes, potentiallyUnneeded, append(allScheduled, unschedulableWaitingForLowerPriorityPreemption...), currentTime, pdbs)
if typedErr != nil {
glog.Errorf("Failed to scale down: %v", typedErr)
return typedErr

View File

@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
apiv1 "k8s.io/api/core/v1"
@ -352,7 +353,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
ScaleDownUnreadyTime: time.Minute,
ScaleDownUnneededTime: time.Minute,
NodeAutoprovisioningEnabled: true,
MaxAutoprovisionedNodeGroupCount: 10,
MaxAutoprovisionedNodeGroupCount: 10, // Pods with null priority are always non expendable. Test if it works.
},
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
@ -536,3 +537,155 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
mock.AssertExpectationsForObjects(t, readyNodeListerMock, allNodeListerMock, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
}
func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
readyNodeListerMock := &nodeListerMock{}
allNodeListerMock := &nodeListerMock{}
scheduledPodMock := &podListerMock{}
unschedulablePodMock := &podListerMock{}
podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{}
daemonSetListerMock := &daemonSetListerMock{}
onScaleUpMock := &onScaleUpMock{}
onScaleDownMock := &onScaleDownMock{}
n1 := BuildTestNode("n1", 100, 1000)
SetNodeReadyState(n1, true, time.Now())
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Now())
n3 := BuildTestNode("n3", 1000, 1000)
SetNodeReadyState(n3, true, time.Now())
// shared owner reference
ownerRef := GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", "")
var priority100 int32 = 100
var priority1 int32 = 1
p1 := BuildTestPod("p1", 40, 0)
p1.OwnerReferences = ownerRef
p1.Spec.NodeName = "n1"
p1.Spec.Priority = &priority1
p2 := BuildTestPod("p2", 400, 0)
p2.OwnerReferences = ownerRef
p2.Spec.NodeName = "n2"
p2.Spec.Priority = &priority1
p3 := BuildTestPod("p3", 400, 0)
p3.OwnerReferences = ownerRef
p3.Spec.NodeName = "n2"
p3.Spec.Priority = &priority100
p4 := BuildTestPod("p4", 500, 0)
p4.OwnerReferences = ownerRef
p4.Spec.Priority = &priority100
p5 := BuildTestPod("p5", 800, 0)
p5.OwnerReferences = ownerRef
p5.Spec.Priority = &priority100
p5.Annotations = map[string]string{scheduler_util.NominatedNodeAnnotationKey: "n3"}
p6 := BuildTestPod("p6", 1000, 0)
p6.OwnerReferences = ownerRef
p6.Spec.Priority = &priority100
provider := testprovider.NewTestCloudProvider(
func(id string, delta int) error {
return onScaleUpMock.ScaleUp(id, delta)
}, func(id string, name string) error {
return onScaleDownMock.ScaleDown(id, name)
})
provider.AddNodeGroup("ng1", 0, 10, 1)
provider.AddNodeGroup("ng2", 0, 10, 2)
provider.AddNode("ng1", n1)
provider.AddNode("ng2", n2)
provider.AddNode("ng2", n3)
assert.NotNil(t, provider)
ng2 := reflect.ValueOf(provider.NodeGroups()[1]).Interface().(*testprovider.TestNodeGroup)
fakeClient := &fake.Clientset{}
fakeRecorder := kube_record.NewFakeRecorder(5)
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false)
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: 10 * time.Second,
}
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, fakeLogRecorder)
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
context := &AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
ScaleDownEnabled: true,
ScaleDownUtilizationThreshold: 0.5,
MaxNodesTotal: 10,
MaxCoresTotal: 10,
MaxMemoryTotal: 100000,
ScaleDownUnreadyTime: time.Minute,
ScaleDownUnneededTime: time.Minute,
ExpendablePodsPriorityCutoff: 10,
},
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
ClientSet: fakeClient,
Recorder: fakeRecorder,
ExpanderStrategy: random.NewStrategy(),
ClusterStateRegistry: clusterState,
LogRecorder: fakeLogRecorder,
}
listerRegistry := kube_util.NewListerRegistry(allNodeListerMock, readyNodeListerMock, scheduledPodMock,
unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock)
sd := NewScaleDown(context)
autoscaler := &StaticAutoscaler{AutoscalingContext: context,
ListerRegistry: listerRegistry,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDown: sd}
// Scale up
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once()
allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once()
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Once()
unschedulablePodMock.On("List").Return([]*apiv1.Pod{p4, p5, p6}, nil).Once()
daemonSetListerMock.On("List").Return([]*extensionsv1.DaemonSet{}, nil).Once()
onScaleUpMock.On("ScaleUp", "ng2", 1).Return(nil).Once()
err := autoscaler.RunOnce(time.Now())
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, readyNodeListerMock, allNodeListerMock, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
// Mark unneeded nodes.
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once()
allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once()
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Once()
unschedulablePodMock.On("List").Return([]*apiv1.Pod{p4, p5}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
ng2.SetTargetSize(2)
err = autoscaler.RunOnce(time.Now().Add(2 * time.Hour))
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, readyNodeListerMock, allNodeListerMock, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
// Scale down.
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once()
allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once()
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4}, nil).Once()
unschedulablePodMock.On("List").Return([]*apiv1.Pod{p5}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
onScaleDownMock.On("ScaleDown", "ng1", "n1").Return(nil).Once()
p4.Spec.NodeName = "n2"
err = autoscaler.RunOnce(time.Now().Add(3 * time.Hour))
waitForDeleteToFinish(t, autoscaler.scaleDown)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, readyNodeListerMock, allNodeListerMock, scheduledPodMock, unschedulablePodMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
@ -107,9 +108,13 @@ func (podMap podSchedulableMap) set(pod *apiv1.Pod, schedulable bool) {
// FilterOutSchedulable checks whether pods from <unschedulableCandidates> marked as unschedulable
// by Scheduler actually can't be scheduled on any node and filter out the ones that can.
func FilterOutSchedulable(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allPods []*apiv1.Pod, predicateChecker *simulator.PredicateChecker) []*apiv1.Pod {
// It takes into account pods that are bound to node and will be scheduled after lower priority pod preemption.
func FilterOutSchedulable(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.Node, allScheduled []*apiv1.Pod, podsWaitingForLowerPriorityPreemption []*apiv1.Pod,
predicateChecker *simulator.PredicateChecker, expendablePodsPriorityCutoff int) []*apiv1.Pod {
unschedulablePods := []*apiv1.Pod{}
nodeNameToNodeInfo := createNodeNameToInfoMap(allPods, nodes)
nonExpendableScheduled := FilterOutExpendablePods(allScheduled, expendablePodsPriorityCutoff)
nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(append(nonExpendableScheduled, podsWaitingForLowerPriorityPreemption...), nodes)
podSchedulable := make(podSchedulableMap)
for _, pod := range unschedulableCandidates {
@ -133,27 +138,34 @@ func FilterOutSchedulable(unschedulableCandidates []*apiv1.Pod, nodes []*apiv1.N
return unschedulablePods
}
// TODO: move this function to scheduler utils.
func createNodeNameToInfoMap(pods []*apiv1.Pod, nodes []*apiv1.Node) map[string]*schedulercache.NodeInfo {
nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods, nodes)
for _, node := range nodes {
if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found {
nodeInfo.SetNode(node)
// FilterOutExpendableAndSplit filters out expendable pods and splits into:
// - waiting for lower priority pods preemption
// - other pods.
func FilterOutExpendableAndSplit(unschedulableCandidates []*apiv1.Pod, expendablePodsPriorityCutoff int) ([]*apiv1.Pod, []*apiv1.Pod) {
unschedulableNonExpendable := []*apiv1.Pod{}
waitingForLowerPriorityPreemption := []*apiv1.Pod{}
for _, pod := range unschedulableCandidates {
if pod.Spec.Priority != nil && int(*pod.Spec.Priority) < expendablePodsPriorityCutoff {
glog.V(4).Infof("Pod %s has priority below %d (%d) and will scheduled when enough resources is free. Ignoring in scale up.", pod.Name, expendablePodsPriorityCutoff, *pod.Spec.Priority)
} else if annot, found := pod.Annotations[scheduler_util.NominatedNodeAnnotationKey]; found && len(annot) > 0 {
waitingForLowerPriorityPreemption = append(waitingForLowerPriorityPreemption, pod)
glog.V(4).Infof("Pod %s will be scheduled after low prioity pods are preempted on %s. Ignoring in scale up.", pod.Name, annot)
} else {
unschedulableNonExpendable = append(unschedulableNonExpendable, pod)
}
}
return unschedulableNonExpendable, waitingForLowerPriorityPreemption
}
// Some pods may be out of sync with node lists. Removing incomplete node infos.
keysToRemove := make([]string, 0)
for key, nodeInfo := range nodeNameToNodeInfo {
if nodeInfo.Node() == nil {
keysToRemove = append(keysToRemove, key)
// FilterOutExpendablePods filters out expendable pods.
func FilterOutExpendablePods(pods []*apiv1.Pod, expendablePodsPriorityCutoff int) []*apiv1.Pod {
result := []*apiv1.Pod{}
for _, pod := range pods {
if pod.Spec.Priority == nil || int(*pod.Spec.Priority) >= expendablePodsPriorityCutoff {
result = append(result, pod)
}
}
for _, key := range keysToRemove {
delete(nodeNameToNodeInfo, key)
}
return nodeNameToNodeInfo
return result
}
// GetNodeInfosForGroups finds NodeInfos for all node groups used to manage the given nodes. It also returns a node group to sample node mapping.

View File

@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
apiv1 "k8s.io/api/core/v1"
@ -149,24 +150,91 @@ func TestFilterOutSchedulable(t *testing.T) {
scheduledPod1 := BuildTestPod("s1", 100, 200000)
scheduledPod2 := BuildTestPod("s2", 1500, 200000)
scheduledPod3 := BuildTestPod("s3", 4000, 200000)
var priority1 int32 = 1
scheduledPod3.Spec.Priority = &priority1
scheduledPod1.Spec.NodeName = "node1"
scheduledPod2.Spec.NodeName = "node1"
scheduledPod2.Spec.NodeName = "node1"
podWaitingForPreemption := BuildTestPod("w1", 1500, 200000)
var priority100 int32 = 100
podWaitingForPreemption.Spec.Priority = &priority100
podWaitingForPreemption.Annotations = map[string]string{scheduler_util.NominatedNodeAnnotationKey: "node1"}
node := BuildTestNode("node1", 2000, 2000000)
SetNodeReadyState(node, true, time.Time{})
predicateChecker := simulator.NewTestPredicateChecker()
res := FilterOutSchedulable(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, p1, p2_1, p2_2, p3_1, p3_2}, predicateChecker)
res := FilterOutSchedulable(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
assert.Equal(t, 2, len(res))
assert.Equal(t, p2_1, res[0])
assert.Equal(t, p2_2, res[1])
res2 := FilterOutSchedulable(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, p1, p2_1, p2_2, p3_1, p3_2}, predicateChecker)
res2 := FilterOutSchedulable(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod2, scheduledPod3}, []*apiv1.Pod{}, predicateChecker, 10)
assert.Equal(t, 3, len(res2))
assert.Equal(t, p1, res2[0])
assert.Equal(t, p2_1, res2[1])
assert.Equal(t, p2_2, res2[2])
res3 := FilterOutSchedulable(unschedulablePods, []*apiv1.Node{node}, []*apiv1.Pod{scheduledPod1, scheduledPod3}, []*apiv1.Pod{podWaitingForPreemption}, predicateChecker, 10)
assert.Equal(t, 3, len(res3))
assert.Equal(t, p1, res3[0])
assert.Equal(t, p2_1, res3[1])
assert.Equal(t, p2_2, res3[2])
}
func TestFilterOutExpendableAndSplit(t *testing.T) {
var priority1 int32 = 1
var priority100 int32 = 100
p1 := BuildTestPod("p1", 1000, 200000)
p1.Spec.Priority = &priority1
p2 := BuildTestPod("p2", 1000, 200000)
p2.Spec.Priority = &priority100
podWaitingForPreemption1 := BuildTestPod("w1", 1000, 200000)
podWaitingForPreemption1.Spec.Priority = &priority1
podWaitingForPreemption1.Annotations = map[string]string{scheduler_util.NominatedNodeAnnotationKey: "node1"}
podWaitingForPreemption2 := BuildTestPod("w2", 1000, 200000)
podWaitingForPreemption2.Spec.Priority = &priority100
podWaitingForPreemption2.Annotations = map[string]string{scheduler_util.NominatedNodeAnnotationKey: "node1"}
res1, res2 := FilterOutExpendableAndSplit([]*apiv1.Pod{p1, p2, podWaitingForPreemption1, podWaitingForPreemption2}, 0)
assert.Equal(t, 2, len(res1))
assert.Equal(t, p1, res1[0])
assert.Equal(t, p2, res1[1])
assert.Equal(t, 2, len(res2))
assert.Equal(t, podWaitingForPreemption1, res2[0])
assert.Equal(t, podWaitingForPreemption2, res2[1])
res1, res2 = FilterOutExpendableAndSplit([]*apiv1.Pod{p1, p2, podWaitingForPreemption1, podWaitingForPreemption2}, 10)
assert.Equal(t, 1, len(res1))
assert.Equal(t, p2, res1[0])
assert.Equal(t, 1, len(res2))
assert.Equal(t, podWaitingForPreemption2, res2[0])
}
func TestFilterOutExpendablePods(t *testing.T) {
p1 := BuildTestPod("p1", 1500, 200000)
p2 := BuildTestPod("p2", 3000, 200000)
podWaitingForPreemption1 := BuildTestPod("w1", 1500, 200000)
var priority1 int32 = -10
podWaitingForPreemption1.Spec.Priority = &priority1
podWaitingForPreemption1.Annotations = map[string]string{scheduler_util.NominatedNodeAnnotationKey: "node1"}
podWaitingForPreemption2 := BuildTestPod("w1", 1500, 200000)
var priority2 int32 = 10
podWaitingForPreemption2.Spec.Priority = &priority2
podWaitingForPreemption2.Annotations = map[string]string{scheduler_util.NominatedNodeAnnotationKey: "node1"}
res := FilterOutExpendablePods([]*apiv1.Pod{p1, p2, podWaitingForPreemption1, podWaitingForPreemption2}, 0)
assert.Equal(t, 3, len(res))
assert.Equal(t, p1, res[0])
assert.Equal(t, p2, res[1])
assert.Equal(t, podWaitingForPreemption2, res[2])
}
func TestGetNodeInfosForGroups(t *testing.T) {

View File

@ -127,6 +127,8 @@ var (
balanceSimilarNodeGroupsFlag = flag.Bool("balance-similar-node-groups", false, "Detect similar node groups and balance the number of nodes between them")
nodeAutoprovisioningEnabled = flag.Bool("node-autoprovisioning-enabled", false, "Should CA autoprovision node groups when needed")
maxAutoprovisionedNodeGroupCount = flag.Int("max-autoprovisioned-node-group-count", 15, "The maximum number of autoprovisioned groups in the cluster.")
expendablePodsPriorityCutoff = flag.Int("expendable-pods-priority_cutoff", 0, "Pods with priority below cutoff will be expendable. They can be killed without any consideration during scale down and they don't cause scale up. Pods with null priority (PodPriority disabled) are non expendable.")
)
func createAutoscalerOptions() core.AutoscalerOptions {
@ -176,6 +178,7 @@ func createAutoscalerOptions() core.AutoscalerOptions {
ClusterName: *clusterName,
NodeAutoprovisioningEnabled: *nodeAutoprovisioningEnabled,
MaxAutoprovisionedNodeGroupCount: *maxAutoprovisionedNodeGroupCount,
ExpendablePodsPriorityCutoff: *expendablePodsPriorityCutoff,
}
configFetcherOpts := dynamic.ConfigFetcherOptions{

View File

@ -27,6 +27,7 @@ import (
policyv1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
client "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -62,7 +63,7 @@ func FindNodesToRemove(candidates []*apiv1.Node, allNodes []*apiv1.Node, pods []
podDisruptionBudgets []*policyv1.PodDisruptionBudget,
) (nodesToRemove []NodeToBeRemoved, unremovableNodes []*apiv1.Node, podReschedulingHints map[string]string, finalError errors.AutoscalerError) {
nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods, allNodes)
nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(pods, allNodes)
result := make([]NodeToBeRemoved, 0)
unremovable := make([]*apiv1.Node, 0)
@ -119,7 +120,7 @@ candidateloop:
// FindEmptyNodesToRemove finds empty nodes that can be removed.
func FindEmptyNodesToRemove(candidates []*apiv1.Node, pods []*apiv1.Pod) []*apiv1.Node {
nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods, candidates)
nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(pods, candidates)
result := make([]*apiv1.Node, 0)
for _, node := range candidates {
if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found {

View File

@ -0,0 +1,68 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
const (
// NominatedNodeAnnotationKey is used to annotate a pod that has preempted other pods.
// The scheduler uses the annotation to find that the pod shouldn't preempt more pods
// when it gets to the head of scheduling queue again.
// See podEligibleToPreemptOthers() for more information.
NominatedNodeAnnotationKey = "NominatedNodeName"
)
// CreateNodeNameToInfoMap obtains a list of pods and pivots that list into a map where the keys are node names
// and the values are the aggregated information for that node. Pods waiting lower priority pods preemption
// (annotated with NominatedNodeAnnotationKey) are also added to list of pods for a node.
func CreateNodeNameToInfoMap(pods []*apiv1.Pod, nodes []*apiv1.Node) map[string]*schedulercache.NodeInfo {
nodeNameToNodeInfo := make(map[string]*schedulercache.NodeInfo)
for _, pod := range pods {
nodeName := pod.Spec.NodeName
if nodeName == "" {
nodeName = pod.Annotations[NominatedNodeAnnotationKey]
}
if _, ok := nodeNameToNodeInfo[nodeName]; !ok {
nodeNameToNodeInfo[nodeName] = schedulercache.NewNodeInfo()
}
nodeNameToNodeInfo[nodeName].AddPod(pod)
}
for _, node := range nodes {
if _, ok := nodeNameToNodeInfo[node.Name]; !ok {
nodeNameToNodeInfo[node.Name] = schedulercache.NewNodeInfo()
}
nodeNameToNodeInfo[node.Name].SetNode(node)
}
// Some pods may be out of sync with node lists. Removing incomplete node infos.
keysToRemove := make([]string, 0)
for key, nodeInfo := range nodeNameToNodeInfo {
if nodeInfo.Node() == nil {
keysToRemove = append(keysToRemove, key)
}
}
for _, key := range keysToRemove {
delete(nodeNameToNodeInfo, key)
}
return nodeNameToNodeInfo
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"testing"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
apiv1 "k8s.io/api/core/v1"
"github.com/stretchr/testify/assert"
)
func TestCreateNodeNameToInfoMap(t *testing.T) {
p1 := BuildTestPod("p1", 1500, 200000)
p1.Spec.NodeName = "node1"
p2 := BuildTestPod("p2", 3000, 200000)
p2.Spec.NodeName = "node2"
p3 := BuildTestPod("p3", 3000, 200000)
p3.Spec.NodeName = "node3"
var priority int32 = 100
podWaitingForPreemption := BuildTestPod("w1", 1500, 200000)
podWaitingForPreemption.Spec.Priority = &priority
podWaitingForPreemption.Annotations = map[string]string{NominatedNodeAnnotationKey: "node1"}
n1 := BuildTestNode("node1", 2000, 2000000)
n2 := BuildTestNode("node2", 2000, 2000000)
res := CreateNodeNameToInfoMap([]*apiv1.Pod{p1, p2, p3, podWaitingForPreemption}, []*apiv1.Node{n1, n2})
assert.Equal(t, 2, len(res))
assert.Equal(t, p1, res["node1"].Pods()[0])
assert.Equal(t, podWaitingForPreemption, res["node1"].Pods()[1])
assert.Equal(t, n1, res["node1"].Node())
assert.Equal(t, p2, res["node2"].Pods()[0])
assert.Equal(t, n2, res["node2"].Node())
}