Merge pull request #2233 from vivekbagade/surge

Adding ScaleDownNodeProcessor
This commit is contained in:
Kubernetes Prow Robot 2019-08-19 03:59:32 -07:00 committed by GitHub
commit 3f0a5fa3c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 475 additions and 143 deletions

View File

@ -29,6 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@ -396,25 +397,33 @@ func (sd *ScaleDown) CleanUpUnneededNodes() {
// UpdateUnneededNodes calculates which nodes are not needed, i.e. all pods can be scheduled somewhere else,
// and updates unneededNodes map accordingly. It also computes information where pods can be rescheduled and
// node utilization level. Timestamp is the current timestamp. The computations are made only for the nodes
// managed by CA.
// node utilization level. The computations are made only for the nodes managed by CA.
// * allNodes are all the nodes processed by CA.
// * destinationNodes are the nodes that can potentially take in any pods that are evicted because of a scale down.
// * scaleDownCandidates are the nodes that are being considered for scale down.
// * pods are the all scheduled pods.
// * timestamp is the current timestamp.
// * pdbs is a list of pod disruption budgets.
// * tempNodesPerNodeGroup is a map of node group id and the number of temporary nodes that node group contains.
func (sd *ScaleDown) UpdateUnneededNodes(
nodes []*apiv1.Node,
nodesToCheck []*apiv1.Node,
allNodes []*apiv1.Node,
destinationNodes []*apiv1.Node,
scaleDownCandidates []*apiv1.Node,
pods []*apiv1.Pod,
timestamp time.Time,
pdbs []*policyv1.PodDisruptionBudget) errors.AutoscalerError {
pdbs []*policyv1.PodDisruptionBudget,
tempNodesPerNodeGroup map[string]int) errors.AutoscalerError {
currentlyUnneededNodes := make([]*apiv1.Node, 0)
// Only scheduled non expendable pods and pods waiting for lower priority pods preemption can prevent node delete.
nonExpendablePods := filterOutExpendablePods(pods, sd.context.ExpendablePodsPriorityCutoff)
nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(nonExpendablePods, nodes)
nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(nonExpendablePods, destinationNodes)
utilizationMap := make(map[string]simulator.UtilizationInfo)
sd.updateUnremovableNodes(nodes)
sd.updateUnremovableNodes(allNodes)
// Filter out nodes that were recently checked
filteredNodesToCheck := make([]*apiv1.Node, 0)
for _, node := range nodesToCheck {
for _, node := range scaleDownCandidates {
if unremovableTimestamp, found := sd.unremovableNodes[node.Name]; found {
if unremovableTimestamp.After(timestamp) {
continue
@ -423,7 +432,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
}
filteredNodesToCheck = append(filteredNodesToCheck, node)
}
skipped := len(nodesToCheck) - len(filteredNodesToCheck)
skipped := len(scaleDownCandidates) - len(filteredNodesToCheck)
if skipped > 0 {
klog.V(1).Infof("Scale-down calculation: ignoring %v nodes unremovable in the last %v", skipped, sd.context.AutoscalingOptions.UnremovableNodeRecheckTimeout)
}
@ -468,7 +477,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
emptyNodes := make(map[string]bool)
emptyNodesList := sd.getEmptyNodesNoResourceLimits(currentlyUnneededNodes, pods, len(currentlyUnneededNodes))
emptyNodesList := sd.getEmptyNodesNoResourceLimits(currentlyUnneededNodes, pods, len(currentlyUnneededNodes), tempNodesPerNodeGroup)
for _, node := range emptyNodesList {
emptyNodes[node.Name] = true
}
@ -485,7 +494,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
// Look for nodes to remove in the current candidates
nodesToRemove, unremovable, newHints, simulatorErr := simulator.FindNodesToRemove(
currentCandidates, nodes, nonExpendablePods, nil, sd.context.PredicateChecker,
currentCandidates, destinationNodes, nonExpendablePods, nil, sd.context.PredicateChecker,
len(currentCandidates), true, sd.podLocationHints, sd.usageTracker, timestamp, pdbs)
if simulatorErr != nil {
return sd.markSimulationError(simulatorErr, timestamp)
@ -496,7 +505,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
additionalCandidatesCount = len(currentNonCandidates)
}
// Limit the additional candidates pool size for better performance.
additionalCandidatesPoolSize := int(math.Ceil(float64(len(nodes)) * sd.context.ScaleDownCandidatesPoolRatio))
additionalCandidatesPoolSize := int(math.Ceil(float64(len(allNodes)) * sd.context.ScaleDownCandidatesPoolRatio))
if additionalCandidatesPoolSize < sd.context.ScaleDownCandidatesPoolMinCount {
additionalCandidatesPoolSize = sd.context.ScaleDownCandidatesPoolMinCount
}
@ -507,7 +516,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
// Look for additional nodes to remove among the rest of nodes.
klog.V(3).Infof("Finding additional %v candidates for scale down.", additionalCandidatesCount)
additionalNodesToRemove, additionalUnremovable, additionalNewHints, simulatorErr :=
simulator.FindNodesToRemove(currentNonCandidates[:additionalCandidatesPoolSize], nodes, nonExpendablePods, nil,
simulator.FindNodesToRemove(currentNonCandidates[:additionalCandidatesPoolSize], destinationNodes, nonExpendablePods, nil,
sd.context.PredicateChecker, additionalCandidatesCount, true,
sd.podLocationHints, sd.usageTracker, timestamp, pdbs)
if simulatorErr != nil {
@ -684,7 +693,8 @@ func (sd *ScaleDown) SoftTaintUnneededNodes(allNodes []*apiv1.Node) (errors []er
// TryToScaleDown tries to scale down the cluster. It returns a result inside a ScaleDownStatus indicating if any node was
// removed and error if such occurred.
func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget, currentTime time.Time) (*status.ScaleDownStatus, errors.AutoscalerError) {
func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget,
currentTime time.Time, tempNodes []*apiv1.Node, tempNodesPerNodeGroup map[string]int) (*status.ScaleDownStatus, errors.AutoscalerError) {
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: sd.nodeDeletionTracker.GetAndClearNodeDeleteResults()}
nodeDeletionDuration := time.Duration(0)
findNodesToRemoveDuration := time.Duration(0)
@ -702,9 +712,11 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
return scaleDownStatus, errors.ToAutoscalerError(errors.CloudProviderError, errCP)
}
nodesWithoutMaster = utils.FilterOutNodes(nodesWithoutMaster, tempNodes)
scaleDownResourcesLeft := computeScaleDownResourcesLeftLimits(nodesWithoutMaster, resourceLimiter, sd.context.CloudProvider, currentTime)
nodeGroupSize := getNodeGroupSizeMap(sd.context.CloudProvider)
nodeGroupSize := utils.GetNodeGroupSizeMap(sd.context.CloudProvider)
resourcesWithLimits := resourceLimiter.GetResources()
for _, node := range nodesWithoutMaster {
if val, found := sd.unneededNodes[node.Name]; found {
@ -746,7 +758,9 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
continue
}
if size-sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id()) <= nodeGroup.MinSize() {
tempNodesForNg := tempNodesPerNodeGroup[nodeGroup.Id()]
deletionsInProgress := sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id())
if size-deletionsInProgress-tempNodesForNg <= nodeGroup.MinSize() {
klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
continue
}
@ -776,7 +790,7 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
// Trying to delete empty nodes in bulk. If there are no empty nodes then CA will
// try to delete not-so-empty nodes, possibly killing some pods and allowing them
// to recreate on other nodes.
emptyNodes := sd.getEmptyNodes(candidates, pods, sd.context.MaxEmptyBulkDelete, scaleDownResourcesLeft)
emptyNodes := sd.getEmptyNodes(candidates, pods, sd.context.MaxEmptyBulkDelete, scaleDownResourcesLeft, tempNodesPerNodeGroup)
if len(emptyNodes) > 0 {
nodeDeletionStart := time.Now()
deletedNodes, err := sd.scheduleDeleteEmptyNodes(emptyNodes, sd.context.ClientSet, sd.context.Recorder, readinessMap, candidateNodeGroups)
@ -860,6 +874,18 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
return scaleDownStatus, nil
}
func getTempNodesPerNodeGroup(cp cloudprovider.CloudProvider, tempNodes []*apiv1.Node) map[string]int {
tempNodesPerNg := make(map[string]int)
for _, node := range tempNodes {
ng, err := cp.NodeGroupForNode(node)
if err != nil || ng == nil {
continue
}
tempNodesPerNg[ng.Id()]++
}
return tempNodesPerNg
}
// updateScaleDownMetrics registers duration of different parts of scale down.
// Separates time spent on finding nodes to remove, deleting nodes and other operations.
func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration *time.Duration, nodeDeletionDuration *time.Duration) {
@ -870,14 +896,14 @@ func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration
metrics.UpdateDuration(metrics.ScaleDownMiscOperations, miscDuration)
}
func (sd *ScaleDown) getEmptyNodesNoResourceLimits(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int) []*apiv1.Node {
return sd.getEmptyNodes(candidates, pods, maxEmptyBulkDelete, noScaleDownLimitsOnResources())
func (sd *ScaleDown) getEmptyNodesNoResourceLimits(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int, tempNodesPerNodeGroup map[string]int) []*apiv1.Node {
return sd.getEmptyNodes(candidates, pods, maxEmptyBulkDelete, noScaleDownLimitsOnResources(), tempNodesPerNodeGroup)
}
// This functions finds empty nodes among passed candidates and returns a list of empty nodes
// that can be deleted at the same time.
func (sd *ScaleDown) getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int,
resourcesLimits scaleDownResourcesLimits) []*apiv1.Node {
resourcesLimits scaleDownResourcesLimits, temporaryNodesPerNodeGroup map[string]int) []*apiv1.Node {
emptyNodes := simulator.FindEmptyNodesToRemove(candidates, pods)
availabilityMap := make(map[string]int)
@ -902,7 +928,9 @@ func (sd *ScaleDown) getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod,
klog.Errorf("Failed to get size for %s: %v ", nodeGroup.Id(), err)
continue
}
available = size - nodeGroup.MinSize() - sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id())
tempNodes := temporaryNodesPerNodeGroup[nodeGroup.Id()]
deletionsInProgress := sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id())
available = size - nodeGroup.MinSize() - deletionsInProgress - tempNodes
if available < 0 {
available = 0
}

View File

@ -132,8 +132,9 @@ func TestFindUnneededNodes(t *testing.T) {
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
sd := NewScaleDown(&context, clusterStateRegistry)
sd.UpdateUnneededNodes([]*apiv1.Node{n1, n2, n3, n4, n5, n7, n8, n9}, []*apiv1.Node{n1, n2, n3, n4, n5, n6, n7, n8, n9},
[]*apiv1.Pod{p1, p2, p3, p4, p5, p6}, time.Now(), nil)
allNodes := []*apiv1.Node{n1, n2, n3, n4, n5, n7, n8, n9}
sd.UpdateUnneededNodes(allNodes, allNodes, allNodes,
[]*apiv1.Pod{p1, p2, p3, p4, p5, p6}, time.Now(), nil, nil)
assert.Equal(t, 3, len(sd.unneededNodes))
addTime, found := sd.unneededNodes["n2"]
@ -147,7 +148,8 @@ func TestFindUnneededNodes(t *testing.T) {
sd.unremovableNodes = make(map[string]time.Time)
sd.unneededNodes["n1"] = time.Now()
sd.UpdateUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, []*apiv1.Node{n1, n2, n3, n4}, []*apiv1.Pod{p1, p2, p3, p4}, time.Now(), nil)
allNodes = []*apiv1.Node{n1, n2, n3, n4}
sd.UpdateUnneededNodes(allNodes, allNodes, allNodes, []*apiv1.Pod{p1, p2, p3, p4}, time.Now(), nil, nil)
sd.unremovableNodes = make(map[string]time.Time)
assert.Equal(t, 1, len(sd.unneededNodes))
@ -157,17 +159,19 @@ func TestFindUnneededNodes(t *testing.T) {
assert.Equal(t, 4, len(sd.nodeUtilizationMap))
sd.unremovableNodes = make(map[string]time.Time)
sd.UpdateUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, []*apiv1.Node{n1, n3, n4}, []*apiv1.Pod{p1, p2, p3, p4}, time.Now(), nil)
scaleDownCandidates := []*apiv1.Node{n1, n3, n4}
sd.UpdateUnneededNodes(allNodes, allNodes, scaleDownCandidates, []*apiv1.Pod{p1, p2, p3, p4}, time.Now(), nil, nil)
assert.Equal(t, 0, len(sd.unneededNodes))
// Node n1 is unneeded, but should be skipped because it has just recently been found to be unremovable
sd.UpdateUnneededNodes([]*apiv1.Node{n1}, []*apiv1.Node{n1}, []*apiv1.Pod{}, time.Now(), nil)
allNodes = []*apiv1.Node{n1}
sd.UpdateUnneededNodes(allNodes, allNodes, allNodes, []*apiv1.Pod{}, time.Now(), nil, nil)
assert.Equal(t, 0, len(sd.unneededNodes))
// Verify that no other nodes are in unremovable map.
assert.Equal(t, 1, len(sd.unremovableNodes))
// But it should be checked after timeout
sd.UpdateUnneededNodes([]*apiv1.Node{n1}, []*apiv1.Node{n1}, []*apiv1.Pod{}, time.Now().Add(context.UnremovableNodeRecheckTimeout+time.Second), nil)
sd.UpdateUnneededNodes(allNodes, allNodes, allNodes, []*apiv1.Pod{}, time.Now().Add(context.UnremovableNodeRecheckTimeout+time.Second), nil, nil)
assert.Equal(t, 1, len(sd.unneededNodes))
// Verify that nodes that are no longer unremovable are removed.
assert.Equal(t, 0, len(sd.unremovableNodes))
@ -222,8 +226,9 @@ func TestFindUnneededGPUNodes(t *testing.T) {
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
sd := NewScaleDown(&context, clusterStateRegistry)
sd.UpdateUnneededNodes([]*apiv1.Node{n1, n2, n3}, []*apiv1.Node{n1, n2, n3},
[]*apiv1.Pod{p1, p2, p3}, time.Now(), nil)
allNodes := []*apiv1.Node{n1, n2, n3}
sd.UpdateUnneededNodes(allNodes, allNodes, allNodes,
[]*apiv1.Pod{p1, p2, p3}, time.Now(), nil, nil)
assert.Equal(t, 1, len(sd.unneededNodes))
_, found := sd.unneededNodes["n2"]
@ -304,8 +309,9 @@ func TestPodsWithPrioritiesFindUnneededNodes(t *testing.T) {
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
sd := NewScaleDown(&context, clusterStateRegistry)
sd.UpdateUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, []*apiv1.Node{n1, n2, n3, n4},
[]*apiv1.Pod{p1, p2, p3, p4, p5, p6, p7}, time.Now(), nil)
allNodes := []*apiv1.Node{n1, n2, n3, n4}
sd.UpdateUnneededNodes(allNodes, allNodes, allNodes,
[]*apiv1.Pod{p1, p2, p3, p4, p5, p6, p7}, time.Now(), nil, nil)
assert.Equal(t, 2, len(sd.unneededNodes))
klog.Warningf("Unneeded nodes %v", sd.unneededNodes)
_, found := sd.unneededNodes["n2"]
@ -354,7 +360,7 @@ func TestFindUnneededMaxCandidates(t *testing.T) {
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
sd := NewScaleDown(&context, clusterStateRegistry)
sd.UpdateUnneededNodes(nodes, nodes, pods, time.Now(), nil)
sd.UpdateUnneededNodes(nodes, nodes, nodes, pods, time.Now(), nil, nil)
assert.Equal(t, numCandidates, len(sd.unneededNodes))
// Simulate one of the unneeded nodes got deleted
deleted := sd.unneededNodesList[len(sd.unneededNodesList)-1]
@ -375,7 +381,7 @@ func TestFindUnneededMaxCandidates(t *testing.T) {
}
}
sd.UpdateUnneededNodes(nodes, nodes, pods, time.Now(), nil)
sd.UpdateUnneededNodes(nodes, nodes, nodes, pods, time.Now(), nil, nil)
// Check that the deleted node was replaced
assert.Equal(t, numCandidates, len(sd.unneededNodes))
assert.NotContains(t, sd.unneededNodes, deleted)
@ -420,7 +426,7 @@ func TestFindUnneededEmptyNodes(t *testing.T) {
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
sd := NewScaleDown(&context, clusterStateRegistry)
sd.UpdateUnneededNodes(nodes, nodes, pods, time.Now(), nil)
sd.UpdateUnneededNodes(nodes, nodes, nodes, pods, time.Now(), nil, nil)
for _, node := range sd.unneededNodesList {
t.Log(node.Name)
}
@ -464,7 +470,7 @@ func TestFindUnneededNodePool(t *testing.T) {
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
sd := NewScaleDown(&context, clusterStateRegistry)
sd.UpdateUnneededNodes(nodes, nodes, pods, time.Now(), nil)
sd.UpdateUnneededNodes(nodes, nodes, nodes, pods, time.Now(), nil, nil)
assert.NotEmpty(t, sd.unneededNodes)
}
@ -937,12 +943,12 @@ func TestScaleDown(t *testing.T) {
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)
context := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
nodes := []*apiv1.Node{n1, n2}
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
scaleDown := NewScaleDown(&context, clusterStateRegistry)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2, p3}, time.Now().Add(-5*time.Minute), nil)
scaleDownStatus, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2, p3}, nil, time.Now())
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes, []*apiv1.Pod{p1, p2, p3}, time.Now().Add(-5*time.Minute), nil, nil)
scaleDownStatus, err := scaleDown.TryToScaleDown(nodes, []*apiv1.Pod{p1, p2, p3}, nil, time.Now(), nil, nil)
waitForDeleteToFinish(t, scaleDown)
assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result)
@ -1059,6 +1065,56 @@ func TestScaleDownEmptyMinMemoryLimitHit(t *testing.T) {
simpleScaleDownEmpty(t, config)
}
func TestScaleDownEmptyTempNodesLimits(t *testing.T) {
options := defaultScaleDownOptions
options.MinMemoryTotal = 4000 * MiB
config := &scaleTestConfig{
nodes: []nodeConfig{
{"n1", 1000, 1000 * MiB, 0, true, "ng1"},
{"n2", 1000, 1000 * MiB, 0, true, "ng1"},
{"n3", 1000, 1000 * MiB, 0, true, "ng1"},
{"n4", 1000, 1000 * MiB, 0, true, "ng1"},
{"n5", 1000, 1000 * MiB, 0, true, "ng1"},
{"n6", 1000, 1000 * MiB, 0, true, "ng1"},
{"n7", 1000, 1000 * MiB, 0, true, "ng2"},
{"n8", 1000, 1000 * MiB, 0, true, "ng2"},
{"n9", 1000, 1000 * MiB, 0, true, "ng2"},
{"n10", 1000, 1000 * MiB, 0, true, "ng2"},
},
options: options,
expectedScaleDowns: []string{"n1", "n2", "n3", "n7"},
tempNodeNames: []string{"n5", "n6"},
}
simpleScaleDownEmpty(t, config)
}
func TestScaleDownEmptyTempNodesMinSize(t *testing.T) {
options := defaultScaleDownOptions
options.MinMemoryTotal = 1000 * MiB
config := &scaleTestConfig{
nodes: []nodeConfig{
{"n1", 1000, 1000 * MiB, 0, true, "ng1"},
{"n2", 1000, 1000 * MiB, 0, true, "ng1"},
{"n3", 1000, 1000 * MiB, 0, true, "ng1"},
{"n4", 1000, 1000 * MiB, 0, true, "ng1"},
{"n6", 1000, 1000 * MiB, 0, true, "ng2"},
{"n7", 1000, 1000 * MiB, 0, true, "ng2"},
{"n8", 1000, 1000 * MiB, 0, true, "ng2"},
{"n9", 1000, 1000 * MiB, 0, true, "ng2"},
{"n10", 1000, 1000 * MiB, 0, true, "ng3"},
{"n11", 1000, 1000 * MiB, 0, true, "ng3"},
{"n12", 1000, 1000 * MiB, 0, true, "ng3"},
},
options: options,
expectedScaleDowns: []string{"n7", "n8", "n10", "n11"},
tempNodeNames: []string{"n1", "n2", "n3", "n6"},
}
simpleScaleDownEmpty(t, config)
}
func TestScaleDownEmptyMinGpuLimitHit(t *testing.T) {
options := defaultScaleDownOptions
options.GpuTotal = []config.GpuLimits{
@ -1126,6 +1182,8 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
nodes := make([]*apiv1.Node, len(config.nodes))
nodesMap := make(map[string]*apiv1.Node)
groups := make(map[string][]*apiv1.Node)
tempNodesPerGroup := make(map[string]int)
var tempNodes []*apiv1.Node
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
deletedNodes <- node
@ -1142,6 +1200,12 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
nodesMap[n.name] = node
nodes[i] = node
groups[n.group] = append(groups[n.group], node)
for _, tempNode := range config.tempNodeNames {
if tempNode == node.Name {
tempNodes = append(tempNodes, node)
tempNodesPerGroup[n.group]++
}
}
}
fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
@ -1185,9 +1249,9 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
if config.nodeDeletionTracker != nil {
scaleDown.nodeDeletionTracker = config.nodeDeletionTracker
}
scaleDown.UpdateUnneededNodes(nodes,
nodes, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
scaleDownStatus, err := scaleDown.TryToScaleDown(nodes, []*apiv1.Pod{}, nil, time.Now())
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil, tempNodesPerGroup)
scaleDownStatus, err := scaleDown.TryToScaleDown(nodes, []*apiv1.Pod{}, nil, time.Now(), tempNodes, tempNodesPerGroup)
assert.False(t, scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress())
assert.NoError(t, err)
@ -1256,12 +1320,14 @@ func TestNoScaleDownUnready(t *testing.T) {
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
nodes := []*apiv1.Node{n1, n2}
// N1 is unready so it requires a bigger unneeded time.
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
scaleDown := NewScaleDown(&context, clusterStateRegistry)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, time.Now().Add(-5*time.Minute), nil)
scaleDownStatus, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, nil, time.Now())
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes,
[]*apiv1.Pod{p2}, time.Now().Add(-5*time.Minute), nil, nil)
scaleDownStatus, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, nil, time.Now(), nil, nil)
waitForDeleteToFinish(t, scaleDown)
assert.NoError(t, err)
@ -1281,9 +1347,10 @@ func TestNoScaleDownUnready(t *testing.T) {
// N1 has been unready for 2 hours, ok to delete.
context.CloudProvider = provider
scaleDown = NewScaleDown(&context, clusterStateRegistry)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Node{n1, n2},
[]*apiv1.Pod{p2}, time.Now().Add(-2*time.Hour), nil)
scaleDownStatus, err = scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, nil, time.Now())
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes,
[]*apiv1.Pod{p2}, time.Now().Add(-2*time.Hour), nil, nil)
scaleDownStatus, err = scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, nil,
time.Now(), nil, nil)
waitForDeleteToFinish(t, scaleDown)
assert.NoError(t, err)
@ -1360,11 +1427,13 @@ func TestScaleDownNoMove(t *testing.T) {
context := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
nodes := []*apiv1.Node{n1, n2}
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
scaleDown := NewScaleDown(&context, clusterStateRegistry)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Node{n1, n2},
[]*apiv1.Pod{p1, p2}, time.Now().Add(5*time.Minute), nil)
scaleDownStatus, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, nil, time.Now())
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes,
[]*apiv1.Pod{p1, p2}, time.Now().Add(5*time.Minute), nil, nil)
scaleDownStatus, err := scaleDown.TryToScaleDown(nodes, []*apiv1.Pod{p1, p2}, nil, time.Now(), nil, nil)
waitForDeleteToFinish(t, scaleDown)
assert.NoError(t, err)
@ -1613,32 +1682,33 @@ func TestSoftTaint(t *testing.T) {
scaleDown := NewScaleDown(&context, clusterStateRegistry)
// Test no superfluous nodes
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000},
[]*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil)
nodes := []*apiv1.Node{n1000, n2000}
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes,
[]*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil, nil)
errs := scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n1000.Name))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2000.Name))
// Test one unneeded node
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000},
[]*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p1200}, time.Now().Add(-5*time.Minute), nil)
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes,
[]*apiv1.Pod{p500, p1200}, time.Now().Add(-5*time.Minute), nil, nil)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.True(t, hasDeletionCandidateTaint(t, fakeClient, n1000.Name))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2000.Name))
// Test remove soft taint
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000},
[]*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil)
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes,
[]*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil, nil)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n1000.Name))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2000.Name))
// Test bulk update taint limit
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000},
[]*apiv1.Node{n1000, n2000}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes,
[]*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil, nil)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient))
@ -1647,8 +1717,8 @@ func TestSoftTaint(t *testing.T) {
assert.Equal(t, 2, countDeletionCandidateTaints(t, fakeClient))
// Test bulk update untaint limit
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000},
[]*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil)
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes,
[]*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil, nil)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient))
@ -1723,8 +1793,9 @@ func TestSoftTaintTimeLimit(t *testing.T) {
scaleDown := NewScaleDown(&context, clusterStateRegistry)
// Test bulk taint
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
nodes := []*apiv1.Node{n1, n2}
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes,
[]*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil, nil)
errs := scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 2, countDeletionCandidateTaints(t, fakeClient))
@ -1732,8 +1803,8 @@ func TestSoftTaintTimeLimit(t *testing.T) {
assert.True(t, hasDeletionCandidateTaint(t, fakeClient, n2.Name))
// Test bulk untaint
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute), nil)
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes,
[]*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute), nil, nil)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient))
@ -1743,8 +1814,8 @@ func TestSoftTaintTimeLimit(t *testing.T) {
updateTime = maxSoftTaintDuration
// Test duration limit of bulk taint
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes,
[]*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil, nil)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient))
@ -1753,8 +1824,8 @@ func TestSoftTaintTimeLimit(t *testing.T) {
assert.Equal(t, 2, countDeletionCandidateTaints(t, fakeClient))
// Test duration limit of bulk untaint
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute), nil)
scaleDown.UpdateUnneededNodes(nodes, nodes, nodes,
[]*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute), nil, nil)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient))

View File

@ -78,6 +78,7 @@ type scaleTestConfig struct {
//expectedScaleUpOptions []groupSizeChange // we expect that all those options should be included in expansion options passed to expander strategy
//expectedFinalScaleUp groupSizeChange // we expect this to be delivered via scale-up event
expectedScaleDowns []string
tempNodeNames []string
}
type scaleTestResults struct {

View File

@ -23,6 +23,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
@ -378,11 +379,41 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
klog.V(4).Infof("Calculating unneeded nodes")
scaleDown.CleanUp(currentTime)
potentiallyUnneeded := getPotentiallyUnneededNodes(autoscalingContext, allNodes)
var scaleDownCandidates []*apiv1.Node
var podDestinations []*apiv1.Node
var temporaryNodes []*apiv1.Node
if a.processors == nil || a.processors.ScaleDownNodeProcessor == nil {
scaleDownCandidates = allNodes
podDestinations = allNodes
temporaryNodes = []*apiv1.Node{}
} else {
var err errors.AutoscalerError
a.processors.ScaleDownNodeProcessor.Reset()
scaleDownCandidates, err = a.processors.ScaleDownNodeProcessor.GetScaleDownCandidates(
autoscalingContext, allNodes)
if err != nil {
klog.Error(err)
return err
}
podDestinations, err = a.processors.ScaleDownNodeProcessor.GetPodDestinationCandidates(autoscalingContext, allNodes)
if err != nil {
klog.Error(err)
return err
}
temporaryNodes, err = a.processors.ScaleDownNodeProcessor.GetTemporaryNodes(allNodes)
if err != nil {
klog.Error(err)
return err
}
}
tempNodesPerNodeGroup := getTempNodesPerNodeGroup(a.CloudProvider, temporaryNodes)
// We use scheduledPods (not originalScheduledPods) here, so artificial scheduled pods introduced by processors
// (e.g unscheduled pods with nominated node name) can block scaledown of given node.
typedErr := scaleDown.UpdateUnneededNodes(allNodes, potentiallyUnneeded, scheduledPods, currentTime, pdbs)
typedErr := scaleDown.UpdateUnneededNodes(allNodes, podDestinations, scaleDownCandidates, scheduledPods, currentTime, pdbs, tempNodesPerNodeGroup)
if typedErr != nil {
scaleDownStatus.Result = status.ScaleDownError
klog.Errorf("Failed to scale down: %v", typedErr)
@ -425,7 +456,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleDownStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
scaleDownStatus, typedErr := scaleDown.TryToScaleDown(allNodes, originalScheduledPods, pdbs, currentTime)
scaleDownStatus, typedErr := scaleDown.TryToScaleDown(allNodes, originalScheduledPods, pdbs, currentTime, temporaryNodes, tempNodesPerNodeGroup)
metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
scaleDownStatus.RemovedNodeGroups = removedNodeGroups

View File

@ -549,38 +549,6 @@ func fixNodeGroupSize(context *context.AutoscalingContext, clusterStateRegistry
return fixed, nil
}
// getPotentiallyUnneededNodes returns nodes that are:
// - managed by the cluster autoscaler
// - in groups with size > min size
func getPotentiallyUnneededNodes(context *context.AutoscalingContext, nodes []*apiv1.Node) []*apiv1.Node {
result := make([]*apiv1.Node, 0, len(nodes))
nodeGroupSize := getNodeGroupSizeMap(context.CloudProvider)
for _, node := range nodes {
nodeGroup, err := context.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Warningf("Error while checking node group for %s: %v", node.Name, err)
continue
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.V(4).Infof("Skipping %s - no node group config", node.Name)
continue
}
size, found := nodeGroupSize[nodeGroup.Id()]
if !found {
klog.Errorf("Error while checking node group size %s: group size not found", nodeGroup.Id())
continue
}
if size <= nodeGroup.MinSize() {
klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
continue
}
result = append(result, node)
}
return result
}
func hasHardInterPodAffinity(affinity *apiv1.Affinity) bool {
if affinity == nil {
return false
@ -640,19 +608,6 @@ func getNodeResource(node *apiv1.Node, resource apiv1.ResourceName) int64 {
return nodeCapacityValue
}
func getNodeGroupSizeMap(cloudProvider cloudprovider.CloudProvider) map[string]int {
nodeGroupSize := make(map[string]int)
for _, nodeGroup := range cloudProvider.NodeGroups() {
size, err := nodeGroup.TargetSize()
if err != nil {
klog.Errorf("Error while checking node group size %s: %v", nodeGroup.Id(), err)
continue
}
nodeGroupSize[nodeGroup.Id()] = size
}
return nodeGroupSize
}
// UpdateClusterStateMetrics updates metrics related to cluster state
func UpdateClusterStateMetrics(csr *clusterstate.ClusterStateRegistry) {
if csr == nil || reflect.ValueOf(csr).IsNil() {

View File

@ -571,29 +571,6 @@ func TestRemoveFixNodeTargetSize(t *testing.T) {
assert.Equal(t, "ng1/-2", change)
}
func TestGetPotentiallyUnneededNodes(t *testing.T) {
ng1_1 := BuildTestNode("ng1-1", 1000, 1000)
ng1_2 := BuildTestNode("ng1-2", 1000, 1000)
ng2_1 := BuildTestNode("ng2-1", 1000, 1000)
noNg := BuildTestNode("no-ng", 1000, 1000)
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 2)
provider.AddNodeGroup("ng2", 1, 10, 1)
provider.AddNode("ng1", ng1_1)
provider.AddNode("ng1", ng1_2)
provider.AddNode("ng2", ng2_1)
context := &context.AutoscalingContext{
CloudProvider: provider,
}
result := getPotentiallyUnneededNodes(context, []*apiv1.Node{ng1_1, ng1_2, ng2_1, noNg})
assert.Equal(t, 2, len(result))
ok1 := result[0].Name == "ng1-1" && result[1].Name == "ng1-2"
ok2 := result[1].Name == "ng1-1" && result[0].Name == "ng1-2"
assert.True(t, ok1 || ok2)
}
func TestConfigurePredicateCheckerForLoop(t *testing.T) {
testCases := []struct {
affinity *apiv1.Affinity

View File

@ -0,0 +1,90 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodes
import (
"reflect"
apiv1 "k8s.io/api/core/v1"
"k8s.io/klog"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)
// PreFilteringScaleDownNodeProcessor filters out scale down candidates from nodegroup with
// size <= minimum number of nodes for that nodegroup and filters out node from non-autoscaled
// nodegroups
type PreFilteringScaleDownNodeProcessor struct {
}
// GetPodDestinationCandidates returns nodes that potentially could act as destinations for pods
// that would become unscheduled after a scale down.
func (n *PreFilteringScaleDownNodeProcessor) GetPodDestinationCandidates(ctx *context.AutoscalingContext,
nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
return nodes, nil
}
// GetScaleDownCandidates returns nodes that potentially could be scaled down and
func (n *PreFilteringScaleDownNodeProcessor) GetScaleDownCandidates(ctx *context.AutoscalingContext,
nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
result := make([]*apiv1.Node, 0, len(nodes))
nodeGroupSize := utils.GetNodeGroupSizeMap(ctx.CloudProvider)
for _, node := range nodes {
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Warningf("Error while checking node group for %s: %v", node.Name, err)
continue
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.V(4).Infof("Skipping %s - no node group config", node.Name)
continue
}
size, found := nodeGroupSize[nodeGroup.Id()]
if !found {
klog.Errorf("Error while checking node group size %s: group size not found", nodeGroup.Id())
continue
}
if size <= nodeGroup.MinSize() {
klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
continue
}
result = append(result, node)
}
return result, nil
}
// GetTemporaryNodes returns nodes that are temporary and will not stay in the node group
func (n *PreFilteringScaleDownNodeProcessor) GetTemporaryNodes(allNodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
return nil, nil
}
// CleanUp is called at CA termination.
func (n *PreFilteringScaleDownNodeProcessor) CleanUp() {
}
// Reset is called before the other funcs of the processors are called every CA loop.
func (n *PreFilteringScaleDownNodeProcessor) Reset() {
}
// NewPreFilteringScaleDownNodeProcessor returns a new PreFilteringScaleDownNodeProcessor.
func NewPreFilteringScaleDownNodeProcessor() *PreFilteringScaleDownNodeProcessor {
return &PreFilteringScaleDownNodeProcessor{}
}

View File

@ -0,0 +1,66 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodes
import (
"testing"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/context"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
func TestPreFilteringScaleDownNodeProcessor_GetPodDestinationCandidates(t *testing.T) {
n1 := BuildTestNode("n1", 100, 1000)
n2 := BuildTestNode("n2", 100, 1000)
ctx := &context.AutoscalingContext{}
defaultProcessor := NewPreFilteringScaleDownNodeProcessor()
expectedNodes := []*apiv1.Node{n1, n2}
nodes := []*apiv1.Node{n1, n2}
nodes, err := defaultProcessor.GetPodDestinationCandidates(ctx, nodes)
assert.NoError(t, err)
assert.Equal(t, nodes, expectedNodes)
}
func TestPreFilteringScaleDownNodeProcessor_GetScaleDownCandidateNodes(t *testing.T) {
ng1_1 := BuildTestNode("ng1-1", 1000, 1000)
ng1_2 := BuildTestNode("ng1-2", 1000, 1000)
ng2_1 := BuildTestNode("ng2-1", 1000, 1000)
noNg := BuildTestNode("no-ng", 1000, 1000)
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 2)
provider.AddNodeGroup("ng2", 1, 10, 1)
provider.AddNode("ng1", ng1_1)
provider.AddNode("ng1", ng1_2)
provider.AddNode("ng2", ng2_1)
ctx := &context.AutoscalingContext{
CloudProvider: provider,
}
expectedNodes := []*apiv1.Node{ng1_1, ng1_2}
defaultProcessor := NewPreFilteringScaleDownNodeProcessor()
inputNodes := []*apiv1.Node{ng1_1, ng1_2, ng2_1, noNg}
result, err := defaultProcessor.GetScaleDownCandidates(ctx, inputNodes)
assert.NoError(t, err)
assert.Equal(t, result, expectedNodes)
}

View File

@ -0,0 +1,39 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodes
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)
// ScaleDownNodeProcessor contains methods to get harbor and scale down candidate nodes
type ScaleDownNodeProcessor interface {
// GetPodDestinationCandidates returns nodes that potentially could act as destinations for pods
// that would become unscheduled after a scale down.
GetPodDestinationCandidates(*context.AutoscalingContext, []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError)
// GetScaleDownCandidates returns nodes that potentially could be scaled down.
GetScaleDownCandidates(*context.AutoscalingContext, []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError)
// GetTemporaryNodes returns nodes that are temporary and will not stay in the node group
GetTemporaryNodes(allNodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError)
// Reset resets the properties if ScaleDownNodeProcessor
Reset()
// CleanUp is called at CA termination
CleanUp()
}

View File

@ -19,6 +19,7 @@ package processors
import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
)
@ -34,6 +35,8 @@ type AutoscalingProcessors struct {
NodeGroupSetProcessor nodegroupset.NodeGroupSetProcessor
// ScaleUpStatusProcessor is used to process the state of the cluster after a scale-up.
ScaleUpStatusProcessor status.ScaleUpStatusProcessor
// ScaleDownNodeProcessor is used to process the nodes of the cluster before scale-down.
ScaleDownNodeProcessor nodes.ScaleDownNodeProcessor
// ScaleDownStatusProcessor is used to process the state of the cluster after a scale-down.
ScaleDownStatusProcessor status.ScaleDownStatusProcessor
// AutoscalingStatusProcessor is used to process the state of the cluster after each autoscaling iteration.
@ -49,6 +52,7 @@ func DefaultProcessors() *AutoscalingProcessors {
NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor(),
NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor(),
ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(),
ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(),
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
@ -64,4 +68,5 @@ func (ap *AutoscalingProcessors) CleanUp() {
ap.ScaleDownStatusProcessor.CleanUp()
ap.AutoscalingStatusProcessor.CleanUp()
ap.NodeGroupManager.CleanUp()
ap.ScaleDownNodeProcessor.CleanUp()
}

View File

@ -71,14 +71,14 @@ type UtilizationInfo struct {
// FindNodesToRemove finds nodes that can be removed. Returns also an information about good
// rescheduling location for each of the pods.
func FindNodesToRemove(candidates []*apiv1.Node, allNodes []*apiv1.Node, pods []*apiv1.Pod,
func FindNodesToRemove(candidates []*apiv1.Node, destinationNodes []*apiv1.Node, pods []*apiv1.Pod,
listers kube_util.ListerRegistry, predicateChecker *PredicateChecker, maxCount int,
fastCheck bool, oldHints map[string]string, usageTracker *UsageTracker,
timestamp time.Time,
podDisruptionBudgets []*policyv1.PodDisruptionBudget,
) (nodesToRemove []NodeToBeRemoved, unremovableNodes []*apiv1.Node, podReschedulingHints map[string]string, finalError errors.AutoscalerError) {
nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(pods, allNodes)
nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(pods, destinationNodes)
result := make([]NodeToBeRemoved, 0)
unremovable := make([]*apiv1.Node, 0)
@ -113,7 +113,7 @@ candidateloop:
unremovable = append(unremovable, node)
continue candidateloop
}
findProblems := findPlaceFor(node.Name, podsToRemove, allNodes, nodeNameToNodeInfo, predicateChecker, oldHints, newHints,
findProblems := findPlaceFor(node.Name, podsToRemove, destinationNodes, nodeNameToNodeInfo, predicateChecker, oldHints, newHints,
usageTracker, timestamp)
if findProblems == nil {

View File

@ -0,0 +1,56 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/klog"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
)
// GetNodeGroupSizeMap return a map of node group id and its target size
func GetNodeGroupSizeMap(cloudProvider cloudprovider.CloudProvider) map[string]int {
nodeGroupSize := make(map[string]int)
for _, nodeGroup := range cloudProvider.NodeGroups() {
size, err := nodeGroup.TargetSize()
if err != nil {
klog.Errorf("Error while checking node group size %s: %v", nodeGroup.Id(), err)
continue
}
nodeGroupSize[nodeGroup.Id()] = size
}
return nodeGroupSize
}
// FilterOutNodes filters out nodesToFilterOut from nodes
func FilterOutNodes(nodes []*apiv1.Node, nodesToFilterOut []*apiv1.Node) []*apiv1.Node {
var filtered []*apiv1.Node
for _, node := range nodes {
found := false
for _, nodeToFilter := range nodesToFilterOut {
if nodeToFilter.Name == node.Name {
found = true
}
}
if !found {
filtered = append(filtered, node)
}
}
return filtered
}

View File

@ -22,7 +22,20 @@ KUBE_ROOT=$(dirname "${BASH_SOURCE}")/..
cd "${KUBE_ROOT}"
GOLINT=${GOLINT:-"golint"}
PACKAGES=($(go list ./... | grep -v /vendor/ | grep -v vertical-pod-autoscaler/pkg/client | grep -v vertical-pod-autoscaler/pkg/apis | grep -v cluster-autoscaler/cloudprovider/magnum/gophercloud | grep -v cluster-autoscaler/cloudprovider/digitalocean/godo))
excluded_packages=(
'/vendor/'
'vertical-pod-autoscaler/pkg/client'
'cluster-autoscaler/cloudprovider/magnum/gophercloud'
'cluster-autoscaler/cloudprovider/digitalocean/godo'
)
FIND_PACKAGES='go list ./... '
for package in in "${excluded_packages[@]}"; do
FIND_PACKAGES+="| grep -v ${package} "
done
PACKAGES=()
mapfile -t PACKAGES < <(eval ${FIND_PACKAGES})
bad_files=()
for package in "${PACKAGES[@]}"; do
out=$("${GOLINT}" -min_confidence=0.9 "${package}")