Keep maximum 30 candidates for scale down with drain

This commit is contained in:
Beata Skiba 2017-08-29 17:12:51 +02:00
parent e9261a249c
commit 4560cc0a85
2 changed files with 126 additions and 14 deletions

View File

@ -58,6 +58,9 @@ const (
ScaleDownNodeDeleteStarted ScaleDownResult = iota
// ScaleDownDisabledKey is the name of annotation marking node as not eligible for scale down.
ScaleDownDisabledKey = "cluster-autoscaler.kubernetes.io/scale-down-disabled"
// ScaleDownNonEmptyCandidatesCount is the maximum number of non empty nodes
// considered at once as candidates for scale down.
ScaleDownNonEmptyCandidatesCount = 30
)
const (
@ -209,26 +212,44 @@ func (sd *ScaleDown) UpdateUnneededNodes(
}
// Phase2 - check which nodes can be probably removed using fast drain.
nodesToRemove, unremovable, newHints, simulatorErr := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods,
nil, sd.context.PredicateChecker,
len(currentlyUnneededNodes), true, sd.podLocationHints, sd.usageTracker, timestamp, pdbs)
currentCandidates, currentNonCandidates := sd.chooseCandidates(currentlyUnneededNodes)
// Look for nodes to remove in the current candidates
nodesToRemove, unremovable, newHints, simulatorErr := simulator.FindNodesToRemove(
currentCandidates, nodes, pods, nil, sd.context.PredicateChecker,
len(currentCandidates), true, sd.podLocationHints, sd.usageTracker, timestamp, pdbs)
if simulatorErr != nil {
glog.Errorf("Error while simulating node drains: %v", simulatorErr)
return sd.markSimulationError(simulatorErr, timestamp)
}
sd.unneededNodesList = make([]*apiv1.Node, 0)
sd.unneededNodes = make(map[string]time.Time)
sd.nodeUtilizationMap = make(map[string]float64)
sd.context.ClusterStateRegistry.UpdateScaleDownCandidates(sd.unneededNodesList, timestamp)
return simulatorErr.AddPrefix("error while simulating node drains: ")
// Check how many candidates we are still missing
additionalCandidatesCount := ScaleDownNonEmptyCandidatesCount - len(nodesToRemove)
if additionalCandidatesCount > len(currentNonCandidates) {
additionalCandidatesCount = len(currentNonCandidates)
}
if additionalCandidatesCount > 0 {
// Look for addidtional nodes to remove among the rest of nodes
glog.V(3).Infof("Finding additional %v candidates for scale down.", additionalCandidatesCount)
additionalNodesToRemove, additionalUnremovable, additionalNewHints, simulatorErr :=
simulator.FindNodesToRemove(currentNonCandidates, nodes, pods, nil,
sd.context.PredicateChecker, additionalCandidatesCount, true,
sd.podLocationHints, sd.usageTracker, timestamp, pdbs)
if simulatorErr != nil {
return sd.markSimulationError(simulatorErr, timestamp)
}
nodesToRemove = append(nodesToRemove, additionalNodesToRemove...)
unremovable = append(unremovable, additionalUnremovable...)
for key, value := range additionalNewHints {
newHints[key] = value
}
}
// Update the timestamp map.
result := make(map[string]time.Time)
unneadedNodeList := make([]*apiv1.Node, 0, len(nodesToRemove))
unneededNodesList := make([]*apiv1.Node, 0, len(nodesToRemove))
for _, node := range nodesToRemove {
name := node.Node.Name
unneadedNodeList = append(unneadedNodeList, node.Node)
unneededNodesList = append(unneededNodesList, node.Node)
if val, found := sd.unneededNodes[name]; !found {
result[name] = timestamp
} else {
@ -236,7 +257,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
}
}
// Add noded to unremovable map
// Add nodes to unremovable map
if len(unremovable) > 0 {
unremovableTimeout := timestamp.Add(UnremovableNodeRecheckTimeout)
for _, node := range unremovable {
@ -245,7 +266,8 @@ func (sd *ScaleDown) UpdateUnneededNodes(
glog.V(1).Infof("%v nodes found unremovable in simulation, will re-check them at %v", len(unremovable), unremovableTimeout)
}
sd.unneededNodesList = unneadedNodeList
// Update state and metrics
sd.unneededNodesList = unneededNodesList
sd.unneededNodes = result
sd.podLocationHints = newHints
sd.nodeUtilizationMap = utilizationMap
@ -254,6 +276,34 @@ func (sd *ScaleDown) UpdateUnneededNodes(
return nil
}
// markSimulationError indicates a simulation error by clearing relevant scale
// down state and returning an apropriate error.
func (sd *ScaleDown) markSimulationError(simulatorErr errors.AutoscalerError,
timestamp time.Time) errors.AutoscalerError {
glog.Errorf("Error while simulating node drains: %v", simulatorErr)
sd.unneededNodesList = make([]*apiv1.Node, 0)
sd.unneededNodes = make(map[string]time.Time)
sd.nodeUtilizationMap = make(map[string]float64)
sd.context.ClusterStateRegistry.UpdateScaleDownCandidates(sd.unneededNodesList, timestamp)
return simulatorErr.AddPrefix("error while simulating node drains: ")
}
// chooseCandidates splits nodes into current candidates for scaledown and the
// rest. Current candidates are unneeded nodes from the previous run that are
// still in the nodes list.
func (sd *ScaleDown) chooseCandidates(nodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) {
currentCandidates := make([]*apiv1.Node, 0, len(sd.unneededNodesList))
currentNonCandidates := make([]*apiv1.Node, 0, len(nodes))
for _, node := range nodes {
if _, found := sd.unneededNodes[node.Name]; found {
currentCandidates = append(currentCandidates, node)
} else {
currentNonCandidates = append(currentNonCandidates, node)
}
}
return currentCandidates, currentNonCandidates
}
// TryToScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was
// removed and error if such occurred.
func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (ScaleDownResult, errors.AutoscalerError) {

View File

@ -131,6 +131,68 @@ func TestFindUnneededNodes(t *testing.T) {
assert.Equal(t, 1, len(sd.unneededNodes))
}
func TestFindUnneededMaxCandidates(t *testing.T) {
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 100, 2)
numNodes := 100
nodes := make([]*apiv1.Node, 0, numNodes)
for i := 0; i < numNodes; i++ {
n := BuildTestNode(fmt.Sprintf("n%v", i), 1000, 10)
SetNodeReadyState(n, true, time.Time{})
provider.AddNode("ng1", n)
nodes = append(nodes, n)
}
pods := make([]*apiv1.Pod, 0, numNodes)
for i := 0; i < numNodes; i++ {
p := BuildTestPod(fmt.Sprintf("p%v", i), 100, 0)
p.Annotations = GetReplicaSetAnnotation()
p.Spec.NodeName = fmt.Sprintf("n%v", i)
pods = append(pods, p)
}
fakeClient := &fake.Clientset{}
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
context := AutoscalingContext{
AutoscalingOptions: AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.35,
},
ClusterStateRegistry: clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}),
PredicateChecker: simulator.NewTestPredicateChecker(),
LogRecorder: fakeLogRecorder,
}
sd := NewScaleDown(&context)
sd.UpdateUnneededNodes(nodes, nodes, pods, time.Now(), nil)
assert.Equal(t, ScaleDownNonEmptyCandidatesCount, len(sd.unneededNodes))
// Simulate one of the unneeded nodes got deleted
deleted := sd.unneededNodesList[len(sd.unneededNodesList)-1]
for i, node := range nodes {
if node.Name == deleted.Name {
// Move pod away from the node
var newNode int
if i >= 1 {
newNode = i - 1
} else {
newNode = i + 1
}
pods[i].Spec.NodeName = nodes[newNode].Name
nodes[i] = nodes[len(nodes)-1]
nodes[len(nodes)-1] = nil
nodes = nodes[:len(nodes)-1]
break
}
}
sd.UpdateUnneededNodes(nodes, nodes, pods, time.Now(), nil)
// Check that the deleted node was replaced
assert.Equal(t, ScaleDownNonEmptyCandidatesCount, len(sd.unneededNodes))
assert.NotContains(t, sd.unneededNodes, deleted)
}
func TestDrainNode(t *testing.T) {
deletedPods := make(chan string, 10)
updatedNodes := make(chan string, 10)