Add a scale down status processor, refactor so that there's more scale down info available to it

This commit is contained in:
Jakub Tużnik 2018-09-07 20:47:04 +02:00
parent dcde1663da
commit 71111da20c
7 changed files with 187 additions and 62 deletions

View File

@ -43,25 +43,10 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
) )
// ScaleDownResult represents the state of scale down.
type ScaleDownResult int
const (
// ScaleDownError - scale down finished with error.
ScaleDownError ScaleDownResult = iota
// ScaleDownNoUnneeded - no unneeded nodes and no errors.
ScaleDownNoUnneeded
// ScaleDownNoNodeDeleted - unneeded nodes present but not available for deletion.
ScaleDownNoNodeDeleted
// ScaleDownNodeDeleted - a node was deleted.
ScaleDownNodeDeleted
// ScaleDownNodeDeleteStarted - a node deletion process was started.
ScaleDownNodeDeleteStarted
)
const ( const (
// ScaleDownDisabledKey is the name of annotation marking node as not eligible for scale down. // ScaleDownDisabledKey is the name of annotation marking node as not eligible for scale down.
ScaleDownDisabledKey = "cluster-autoscaler.kubernetes.io/scale-down-disabled" ScaleDownDisabledKey = "cluster-autoscaler.kubernetes.io/scale-down-disabled"
@ -85,6 +70,10 @@ const (
type NodeDeleteStatus struct { type NodeDeleteStatus struct {
sync.Mutex sync.Mutex
deleteInProgress bool deleteInProgress bool
// A map of node delete results by node name. It contains nil if the delete was successful and an error otherwise.
// It's being constantly drained into ScaleDownStatus objects in order to notify the ScaleDownStatusProcessor that
// the node drain has ended or that an error occurred during the deletion process.
nodeDeleteResults map[string]error
} }
// IsDeleteInProgress returns true if a node is being deleted. // IsDeleteInProgress returns true if a node is being deleted.
@ -101,6 +90,22 @@ func (n *NodeDeleteStatus) SetDeleteInProgress(status bool) {
n.deleteInProgress = status n.deleteInProgress = status
} }
// AddNodeDeleteResult adds a node delete result to the result map.
func (n *NodeDeleteStatus) AddNodeDeleteResult(nodeName string, result error) {
n.Lock()
defer n.Unlock()
n.nodeDeleteResults[nodeName] = result
}
// DrainNodeDeleteResults returns the whole result map and replaces it with a new empty one.
func (n *NodeDeleteStatus) DrainNodeDeleteResults() map[string]error {
n.Lock()
defer n.Unlock()
results := n.nodeDeleteResults
n.nodeDeleteResults = make(map[string]error)
return results
}
type scaleDownResourcesLimits map[string]int64 type scaleDownResourcesLimits map[string]int64
type scaleDownResourcesDelta map[string]int64 type scaleDownResourcesDelta map[string]int64
@ -301,7 +306,7 @@ type ScaleDown struct {
unneededNodesList []*apiv1.Node unneededNodesList []*apiv1.Node
unremovableNodes map[string]time.Time unremovableNodes map[string]time.Time
podLocationHints map[string]string podLocationHints map[string]string
nodeUtilizationMap map[string]float64 nodeUtilizationMap map[string]simulator.UtilizationInfo
usageTracker *simulator.UsageTracker usageTracker *simulator.UsageTracker
nodeDeleteStatus *NodeDeleteStatus nodeDeleteStatus *NodeDeleteStatus
} }
@ -314,10 +319,10 @@ func NewScaleDown(context *context.AutoscalingContext, clusterStateRegistry *clu
unneededNodes: make(map[string]time.Time), unneededNodes: make(map[string]time.Time),
unremovableNodes: make(map[string]time.Time), unremovableNodes: make(map[string]time.Time),
podLocationHints: make(map[string]string), podLocationHints: make(map[string]string),
nodeUtilizationMap: make(map[string]float64), nodeUtilizationMap: make(map[string]simulator.UtilizationInfo),
usageTracker: simulator.NewUsageTracker(), usageTracker: simulator.NewUsageTracker(),
unneededNodesList: make([]*apiv1.Node, 0), unneededNodesList: make([]*apiv1.Node, 0),
nodeDeleteStatus: &NodeDeleteStatus{}, nodeDeleteStatus: &NodeDeleteStatus{nodeDeleteResults: make(map[string]error)},
} }
} }
@ -352,7 +357,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
// Only scheduled non expendable pods and pods waiting for lower priority pods preemption can prevent node delete. // Only scheduled non expendable pods and pods waiting for lower priority pods preemption can prevent node delete.
nonExpendablePods := FilterOutExpendablePods(pods, sd.context.ExpendablePodsPriorityCutoff) nonExpendablePods := FilterOutExpendablePods(pods, sd.context.ExpendablePodsPriorityCutoff)
nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(nonExpendablePods, nodes) nodeNameToNodeInfo := scheduler_util.CreateNodeNameToInfoMap(nonExpendablePods, nodes)
utilizationMap := make(map[string]float64) utilizationMap := make(map[string]simulator.UtilizationInfo)
sd.updateUnremovableNodes(nodes) sd.updateUnremovableNodes(nodes)
// Filter out nodes that were recently checked // Filter out nodes that were recently checked
@ -394,16 +399,16 @@ func (sd *ScaleDown) UpdateUnneededNodes(
glog.Errorf("Node info for %s not found", node.Name) glog.Errorf("Node info for %s not found", node.Name)
continue continue
} }
utilization, err := simulator.CalculateUtilization(node, nodeInfo) utilInfo, err := simulator.CalculateUtilization(node, nodeInfo)
if err != nil { if err != nil {
glog.Warningf("Failed to calculate utilization for %s: %v", node.Name, err) glog.Warningf("Failed to calculate utilization for %s: %v", node.Name, err)
} }
glog.V(4).Infof("Node %s - utilization %f", node.Name, utilization) glog.V(4).Infof("Node %s - utilization %f", node.Name, utilInfo.Utilization)
utilizationMap[node.Name] = utilization utilizationMap[node.Name] = utilInfo
if utilization >= sd.context.ScaleDownUtilizationThreshold { if utilInfo.Utilization >= sd.context.ScaleDownUtilizationThreshold {
glog.V(4).Infof("Node %s is not suitable for removal - utilization too big (%f)", node.Name, utilization) glog.V(4).Infof("Node %s is not suitable for removal - utilization too big (%f)", node.Name, utilInfo.Utilization)
continue continue
} }
currentlyUnneededNodes = append(currentlyUnneededNodes, node) currentlyUnneededNodes = append(currentlyUnneededNodes, node)
@ -528,7 +533,7 @@ func (sd *ScaleDown) markSimulationError(simulatorErr errors.AutoscalerError,
glog.Errorf("Error while simulating node drains: %v", simulatorErr) glog.Errorf("Error while simulating node drains: %v", simulatorErr)
sd.unneededNodesList = make([]*apiv1.Node, 0) sd.unneededNodesList = make([]*apiv1.Node, 0)
sd.unneededNodes = make(map[string]time.Time) sd.unneededNodes = make(map[string]time.Time)
sd.nodeUtilizationMap = make(map[string]float64) sd.nodeUtilizationMap = make(map[string]simulator.UtilizationInfo)
sd.clusterStateRegistry.UpdateScaleDownCandidates(sd.unneededNodesList, timestamp) sd.clusterStateRegistry.UpdateScaleDownCandidates(sd.unneededNodesList, timestamp)
return simulatorErr.AddPrefix("error while simulating node drains: ") return simulatorErr.AddPrefix("error while simulating node drains: ")
} }
@ -554,9 +559,23 @@ func (sd *ScaleDown) chooseCandidates(nodes []*apiv1.Node) ([]*apiv1.Node, []*ap
return currentCandidates, currentNonCandidates return currentCandidates, currentNonCandidates
} }
// TryToScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was func (sd *ScaleDown) mapNodesToStatusScaleDownNodes(nodes []*apiv1.Node, nodeGroups map[string]cloudprovider.NodeGroup, evictedPodLists map[string][]*apiv1.Pod) []*status.ScaleDownNode {
var result []*status.ScaleDownNode
for _, node := range nodes {
result = append(result, &status.ScaleDownNode{
Node: node,
NodeGroup: nodeGroups[node.Name],
UtilInfo: sd.nodeUtilizationMap[node.Name],
EvictedPods: evictedPodLists[node.Name],
})
}
return result
}
// TryToScaleDown tries to scale down the cluster. It returns a result inside a ScaleDownStatus indicating if any node was
// removed and error if such occurred. // removed and error if such occurred.
func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget, currentTime time.Time) (ScaleDownResult, errors.AutoscalerError) { func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget, currentTime time.Time) (*status.ScaleDownStatus, errors.AutoscalerError) {
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: sd.nodeDeleteStatus.DrainNodeDeleteResults()}
nodeDeletionDuration := time.Duration(0) nodeDeletionDuration := time.Duration(0)
findNodesToRemoveDuration := time.Duration(0) findNodesToRemoveDuration := time.Duration(0)
defer updateScaleDownMetrics(time.Now(), &findNodesToRemoveDuration, &nodeDeletionDuration) defer updateScaleDownMetrics(time.Now(), &findNodesToRemoveDuration, &nodeDeletionDuration)
@ -567,9 +586,8 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
resourceLimiter, errCP := sd.context.CloudProvider.GetResourceLimiter() resourceLimiter, errCP := sd.context.CloudProvider.GetResourceLimiter()
if errCP != nil { if errCP != nil {
return ScaleDownError, errors.ToAutoscalerError( scaleDownStatus.Result = status.ScaleDownError
errors.CloudProviderError, return scaleDownStatus, errors.ToAutoscalerError(errors.CloudProviderError, errCP)
errCP)
} }
scaleDownResourcesLeft := computeScaleDownResourcesLeftLimits(nodesWithoutMaster, resourceLimiter, sd.context.CloudProvider, currentTime) scaleDownResourcesLeft := computeScaleDownResourcesLeftLimits(nodesWithoutMaster, resourceLimiter, sd.context.CloudProvider, currentTime)
@ -639,7 +657,8 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
} }
if len(candidates) == 0 { if len(candidates) == 0 {
glog.V(1).Infof("No candidates for scale down") glog.V(1).Infof("No candidates for scale down")
return ScaleDownNoUnneeded, nil scaleDownStatus.Result = status.ScaleDownNoUnneeded
return scaleDownStatus, nil
} }
// Trying to delete empty nodes in bulk. If there are no empty nodes then CA will // Trying to delete empty nodes in bulk. If there are no empty nodes then CA will
@ -653,9 +672,12 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
err := sd.waitForEmptyNodesDeleted(emptyNodes, confirmation) err := sd.waitForEmptyNodesDeleted(emptyNodes, confirmation)
nodeDeletionDuration = time.Now().Sub(nodeDeletionStart) nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
if err == nil { if err == nil {
return ScaleDownNodeDeleted, nil scaleDownStatus.ScaledDownNodes = sd.mapNodesToStatusScaleDownNodes(emptyNodes, candidateNodeGroups, make(map[string][]*apiv1.Pod))
scaleDownStatus.Result = status.ScaleDownNodeDeleted
return scaleDownStatus, nil
} }
return ScaleDownError, err.AddPrefix("failed to delete at least one empty node: ") scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err.AddPrefix("failed to delete at least one empty node: ")
} }
findNodesToRemoveStart := time.Now() findNodesToRemoveStart := time.Now()
@ -668,11 +690,13 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart) findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart)
if err != nil { if err != nil {
return ScaleDownError, err.AddPrefix("Find node to remove failed: ") scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err.AddPrefix("Find node to remove failed: ")
} }
if len(nodesToRemove) == 0 { if len(nodesToRemove) == 0 {
glog.V(1).Infof("No node to remove") glog.V(1).Infof("No node to remove")
return ScaleDownNoNodeDeleted, nil scaleDownStatus.Result = status.ScaleDownNoNodeDeleted
return scaleDownStatus, nil
} }
toRemove := nodesToRemove[0] toRemove := nodesToRemove[0]
utilization := sd.nodeUtilizationMap[toRemove.Node.Name] utilization := sd.nodeUtilizationMap[toRemove.Node.Name]
@ -695,8 +719,10 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
go func() { go func() {
// Finishing the delete process once this goroutine is over. // Finishing the delete process once this goroutine is over.
var err error
defer func() { sd.nodeDeleteStatus.AddNodeDeleteResult(toRemove.Node.Name, err) }()
defer sd.nodeDeleteStatus.SetDeleteInProgress(false) defer sd.nodeDeleteStatus.SetDeleteInProgress(false)
err := sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule) err = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule)
if err != nil { if err != nil {
glog.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err) glog.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err)
return return
@ -709,7 +735,9 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
} }
}() }()
return ScaleDownNodeDeleteStarted, nil scaleDownStatus.ScaledDownNodes = sd.mapNodesToStatusScaleDownNodes([]*apiv1.Node{toRemove.Node}, candidateNodeGroups, map[string][]*apiv1.Pod{toRemove.Node.Name: toRemove.PodsToReschedule})
scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted
return scaleDownStatus, nil
} }
// updateScaleDownMetrics registers duration of different parts of scale down. // updateScaleDownMetrics registers duration of different parts of scale down.

View File

@ -43,6 +43,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
) )
@ -764,10 +765,10 @@ func TestScaleDown(t *testing.T) {
scaleDown := NewScaleDown(&context, clusterStateRegistry) scaleDown := NewScaleDown(&context, clusterStateRegistry)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2, p3}, time.Now().Add(-5*time.Minute), nil) []*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2, p3}, time.Now().Add(-5*time.Minute), nil)
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2, p3}, nil, time.Now()) scaleDownStatus, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2, p3}, nil, time.Now())
waitForDeleteToFinish(t, scaleDown) waitForDeleteToFinish(t, scaleDown)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, ScaleDownNodeDeleteStarted, result) assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result)
assert.Equal(t, n1.Name, getStringFromChan(deletedNodes)) assert.Equal(t, n1.Name, getStringFromChan(deletedNodes))
assert.Equal(t, n1.Name, getStringFromChan(updatedNodes)) assert.Equal(t, n1.Name, getStringFromChan(updatedNodes))
} }
@ -973,20 +974,20 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
scaleDown := NewScaleDown(&context, clusterStateRegistry) scaleDown := NewScaleDown(&context, clusterStateRegistry)
scaleDown.UpdateUnneededNodes(nodes, scaleDown.UpdateUnneededNodes(nodes,
nodes, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil) nodes, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
result, err := scaleDown.TryToScaleDown(nodes, []*apiv1.Pod{}, nil, time.Now()) scaleDownStatus, err := scaleDown.TryToScaleDown(nodes, []*apiv1.Pod{}, nil, time.Now())
waitForDeleteToFinish(t, scaleDown) waitForDeleteToFinish(t, scaleDown)
// This helps to verify that TryToScaleDown doesn't attempt to remove anything // This helps to verify that TryToScaleDown doesn't attempt to remove anything
// after delete in progress status is gone. // after delete in progress status is gone.
close(deletedNodes) close(deletedNodes)
assert.NoError(t, err) assert.NoError(t, err)
var expectedScaleDownResult ScaleDownResult var expectedScaleDownResult status.ScaleDownResult
if len(config.expectedScaleDowns) > 0 { if len(config.expectedScaleDowns) > 0 {
expectedScaleDownResult = ScaleDownNodeDeleted expectedScaleDownResult = status.ScaleDownNodeDeleted
} else { } else {
expectedScaleDownResult = ScaleDownNoUnneeded expectedScaleDownResult = status.ScaleDownNoUnneeded
} }
assert.Equal(t, expectedScaleDownResult, result) assert.Equal(t, expectedScaleDownResult, scaleDownStatus.Result)
// Check the channel (and make sure there isn't more than there should be). // Check the channel (and make sure there isn't more than there should be).
// Report only up to 10 extra nodes found. // Report only up to 10 extra nodes found.
@ -1049,11 +1050,11 @@ func TestNoScaleDownUnready(t *testing.T) {
scaleDown := NewScaleDown(&context, clusterStateRegistry) scaleDown := NewScaleDown(&context, clusterStateRegistry)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, time.Now().Add(-5*time.Minute), nil) []*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, time.Now().Add(-5*time.Minute), nil)
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, nil, time.Now()) scaleDownStatus, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, nil, time.Now())
waitForDeleteToFinish(t, scaleDown) waitForDeleteToFinish(t, scaleDown)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, ScaleDownNoUnneeded, result) assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result)
deletedNodes := make(chan string, 10) deletedNodes := make(chan string, 10)
@ -1071,11 +1072,11 @@ func TestNoScaleDownUnready(t *testing.T) {
scaleDown = NewScaleDown(&context, clusterStateRegistry) scaleDown = NewScaleDown(&context, clusterStateRegistry)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Node{n1, n2}, scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Node{n1, n2},
[]*apiv1.Pod{p2}, time.Now().Add(-2*time.Hour), nil) []*apiv1.Pod{p2}, time.Now().Add(-2*time.Hour), nil)
result, err = scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, nil, time.Now()) scaleDownStatus, err = scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, nil, time.Now())
waitForDeleteToFinish(t, scaleDown) waitForDeleteToFinish(t, scaleDown)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, ScaleDownNodeDeleteStarted, result) assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result)
assert.Equal(t, n1.Name, getStringFromChan(deletedNodes)) assert.Equal(t, n1.Name, getStringFromChan(deletedNodes))
} }
@ -1148,11 +1149,11 @@ func TestScaleDownNoMove(t *testing.T) {
scaleDown := NewScaleDown(&context, clusterStateRegistry) scaleDown := NewScaleDown(&context, clusterStateRegistry)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Node{n1, n2}, scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Node{n1, n2},
[]*apiv1.Pod{p1, p2}, time.Now().Add(5*time.Minute), nil) []*apiv1.Pod{p1, p2}, time.Now().Add(5*time.Minute), nil)
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, nil, time.Now()) scaleDownStatus, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, nil, time.Now())
waitForDeleteToFinish(t, scaleDown) waitForDeleteToFinish(t, scaleDown)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, ScaleDownNoUnneeded, result) assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result)
} }
func getStringFromChan(c chan string) string { func getStringFromChan(c chan string) string {

View File

@ -35,6 +35,7 @@ import (
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
) )
const ( const (
@ -334,17 +335,21 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleDownStart := time.Now() scaleDownStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart) metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
result, typedErr := scaleDown.TryToScaleDown(allNodes, allScheduled, pdbs, currentTime) scaleDownStatus, typedErr := scaleDown.TryToScaleDown(allNodes, allScheduled, pdbs, currentTime)
metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart) metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
if scaleDownStatus.Result == status.ScaleDownNodeDeleted {
a.lastScaleDownDeleteTime = currentTime
a.clusterStateRegistry.Recalculate()
}
if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus)
}
if typedErr != nil { if typedErr != nil {
glog.Errorf("Failed to scale down: %v", err) glog.Errorf("Failed to scale down: %v", err)
a.lastScaleDownFailTime = currentTime a.lastScaleDownFailTime = currentTime
return typedErr return typedErr
} }
if result == ScaleDownNodeDeleted {
a.lastScaleDownDeleteTime = currentTime
}
} }
} }
return nil return nil

View File

@ -31,6 +31,8 @@ type AutoscalingProcessors struct {
NodeGroupListProcessor nodegroups.NodeGroupListProcessor NodeGroupListProcessor nodegroups.NodeGroupListProcessor
// ScaleUpStatusProcessor is used to process the state of the cluster after a scale-up. // ScaleUpStatusProcessor is used to process the state of the cluster after a scale-up.
ScaleUpStatusProcessor status.ScaleUpStatusProcessor ScaleUpStatusProcessor status.ScaleUpStatusProcessor
// ScaleDownStatusProcessor is used to process the state of the cluster after a scale-down.
ScaleDownStatusProcessor status.ScaleDownStatusProcessor
// AutoscalingStatusProcessor is used to process the state of the cluster after each autoscaling iteration. // AutoscalingStatusProcessor is used to process the state of the cluster after each autoscaling iteration.
AutoscalingStatusProcessor status.AutoscalingStatusProcessor AutoscalingStatusProcessor status.AutoscalingStatusProcessor
// NodeGroupManager is responsible for creating/deleting node groups. // NodeGroupManager is responsible for creating/deleting node groups.
@ -43,6 +45,7 @@ func DefaultProcessors() *AutoscalingProcessors {
PodListProcessor: pods.NewDefaultPodListProcessor(), PodListProcessor: pods.NewDefaultPodListProcessor(),
NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor(), NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor(),
ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(), ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(),
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(), AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
} }
@ -55,6 +58,7 @@ func TestProcessors() *AutoscalingProcessors {
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}, NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{},
// TODO(bskiba): change scale up test so that this can be a NoOpProcessor // TODO(bskiba): change scale up test so that this can be a NoOpProcessor
ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{}, ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{},
ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{},
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{}, AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
} }
@ -65,6 +69,7 @@ func (ap *AutoscalingProcessors) CleanUp() {
ap.PodListProcessor.CleanUp() ap.PodListProcessor.CleanUp()
ap.NodeGroupListProcessor.CleanUp() ap.NodeGroupListProcessor.CleanUp()
ap.ScaleUpStatusProcessor.CleanUp() ap.ScaleUpStatusProcessor.CleanUp()
ap.ScaleDownStatusProcessor.CleanUp()
ap.AutoscalingStatusProcessor.CleanUp() ap.AutoscalingStatusProcessor.CleanUp()
ap.NodeGroupManager.CleanUp() ap.NodeGroupManager.CleanUp()
} }

View File

@ -0,0 +1,77 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package status
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
)
// ScaleDownStatus represents the state of scale down.
type ScaleDownStatus struct {
Result ScaleDownResult
ScaledDownNodes []*ScaleDownNode
NodeDeleteResults map[string]error
}
// ScaleDownNode represents the state of a node that's being scaled down.
type ScaleDownNode struct {
Node *apiv1.Node
NodeGroup cloudprovider.NodeGroup
EvictedPods []*apiv1.Pod
UtilInfo simulator.UtilizationInfo
}
// ScaleDownResult represents the result of scale down.
type ScaleDownResult int
const (
// ScaleDownError - scale down finished with error.
ScaleDownError ScaleDownResult = iota
// ScaleDownNoUnneeded - no unneeded nodes and no errors.
ScaleDownNoUnneeded
// ScaleDownNoNodeDeleted - unneeded nodes present but not available for deletion.
ScaleDownNoNodeDeleted
// ScaleDownNodeDeleted - a node was deleted.
ScaleDownNodeDeleted
// ScaleDownNodeDeleteStarted - a node deletion process was started.
ScaleDownNodeDeleteStarted
)
// ScaleDownStatusProcessor processes the status of the cluster after a scale-down.
type ScaleDownStatusProcessor interface {
Process(context *context.AutoscalingContext, status *ScaleDownStatus)
CleanUp()
}
// NewDefaultScaleDownStatusProcessor creates a default instance of ScaleUpStatusProcessor.
func NewDefaultScaleDownStatusProcessor() ScaleDownStatusProcessor {
return &NoOpScaleDownStatusProcessor{}
}
// NoOpScaleDownStatusProcessor is a ScaleDownStatusProcessor implementations useful for testing.
type NoOpScaleDownStatusProcessor struct{}
// Process processes the status of the cluster after a scale-down.
func (p *NoOpScaleDownStatusProcessor) Process(context *context.AutoscalingContext, status *ScaleDownStatus) {
}
// CleanUp cleans up the processor's internal structures.
func (p *NoOpScaleDownStatusProcessor) CleanUp() {
}

View File

@ -57,6 +57,14 @@ type NodeToBeRemoved struct {
PodsToReschedule []*apiv1.Pod PodsToReschedule []*apiv1.Pod
} }
// UtilizationInfo contains utilization information for a node.
type UtilizationInfo struct {
CpuUtil float64
MemUtil float64
// Max(CpuUtil, MemUtil).
Utilization float64
}
// FindNodesToRemove finds nodes that can be removed. Returns also an information about good // FindNodesToRemove finds nodes that can be removed. Returns also an information about good
// rescheduling location for each of the pods. // rescheduling location for each of the pods.
func FindNodesToRemove(candidates []*apiv1.Node, allNodes []*apiv1.Node, pods []*apiv1.Pod, func FindNodesToRemove(candidates []*apiv1.Node, allNodes []*apiv1.Node, pods []*apiv1.Pod,
@ -141,17 +149,18 @@ func FindEmptyNodesToRemove(candidates []*apiv1.Node, pods []*apiv1.Pod) []*apiv
} }
// CalculateUtilization calculates utilization of a node, defined as maximum of (cpu, memory) utilization. // CalculateUtilization calculates utilization of a node, defined as maximum of (cpu, memory) utilization.
// Per resource utilization is the sum of requests for it divided by allocatable. // Per resource utilization is the sum of requests for it divided by allocatable. It also returns the individual
func CalculateUtilization(node *apiv1.Node, nodeInfo *schedulercache.NodeInfo) (float64, error) { // cpu and memory utilization.
func CalculateUtilization(node *apiv1.Node, nodeInfo *schedulercache.NodeInfo) (utilInfo UtilizationInfo, err error) {
cpu, err := calculateUtilizationOfResource(node, nodeInfo, apiv1.ResourceCPU) cpu, err := calculateUtilizationOfResource(node, nodeInfo, apiv1.ResourceCPU)
if err != nil { if err != nil {
return 0, err return UtilizationInfo{}, err
} }
mem, err := calculateUtilizationOfResource(node, nodeInfo, apiv1.ResourceMemory) mem, err := calculateUtilizationOfResource(node, nodeInfo, apiv1.ResourceMemory)
if err != nil { if err != nil {
return 0, err return UtilizationInfo{}, err
} }
return math.Max(cpu, mem), nil return UtilizationInfo{CpuUtil: cpu, MemUtil: mem, Utilization: math.Max(cpu, mem)}, nil
} }
func calculateUtilizationOfResource(node *apiv1.Node, nodeInfo *schedulercache.NodeInfo, resourceName apiv1.ResourceName) (float64, error) { func calculateUtilizationOfResource(node *apiv1.Node, nodeInfo *schedulercache.NodeInfo, resourceName apiv1.ResourceName) (float64, error) {

View File

@ -38,9 +38,9 @@ func TestUtilization(t *testing.T) {
node := BuildTestNode("node1", 2000, 2000000) node := BuildTestNode("node1", 2000, 2000000)
SetNodeReadyState(node, true, time.Time{}) SetNodeReadyState(node, true, time.Time{})
utilization, err := CalculateUtilization(node, nodeInfo) utilInfo, err := CalculateUtilization(node, nodeInfo)
assert.NoError(t, err) assert.NoError(t, err)
assert.InEpsilon(t, 2.0/10, utilization, 0.01) assert.InEpsilon(t, 2.0/10, utilInfo.Utilization, 0.01)
node2 := BuildTestNode("node1", 2000, -1) node2 := BuildTestNode("node1", 2000, -1)