Respond to readability-related comments from the review
This commit is contained in:
parent
7e3e15bd16
commit
1ecb84389b
|
|
@ -28,6 +28,7 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
|
||||
|
|
@ -50,7 +51,7 @@ type Actuator struct {
|
|||
// TODO: Move budget processor to scaledown planner, potentially merge into PostFilteringScaleDownNodeProcessor
|
||||
// This is a larger change to the code structure which impacts some existing actuator unit tests
|
||||
// as well as Cluster Autoscaler implementations that may override ScaleDownSetProcessor
|
||||
budgetProcessor *ScaleDownBudgetProcessor
|
||||
budgetProcessor *budgets.ScaleDownBudgetProcessor
|
||||
}
|
||||
|
||||
// NewActuator returns a new instance of Actuator.
|
||||
|
|
@ -61,7 +62,7 @@ func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterState
|
|||
clusterState: csr,
|
||||
nodeDeletionTracker: ndt,
|
||||
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, deleteOptions, NewDefaultEvictor(deleteOptions, ndt)),
|
||||
budgetProcessor: NewScaleDownBudgetProcessor(ctx, ndt),
|
||||
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx, ndt),
|
||||
deleteOptions: deleteOptions,
|
||||
}
|
||||
}
|
||||
|
|
@ -120,9 +121,9 @@ func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (*status.ScaleDownS
|
|||
|
||||
// deleteAsyncEmpty immediately starts deletions asynchronously.
|
||||
// scaledDownNodes return value contains all nodes for which deletion successfully started.
|
||||
func (a *Actuator) deleteAsyncEmpty(nodeBuckets []*nodeBucket) (reportedSDNodes []*status.ScaleDownNode) {
|
||||
for _, bucket := range nodeBuckets {
|
||||
for _, node := range bucket.nodes {
|
||||
func (a *Actuator) deleteAsyncEmpty(NodeGroupViews []*budgets.NodeGroupView) (reportedSDNodes []*status.ScaleDownNode) {
|
||||
for _, bucket := range NodeGroupViews {
|
||||
for _, node := range bucket.Nodes {
|
||||
klog.V(0).Infof("Scale-down: removing empty node %q", node.Name)
|
||||
a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %q", node.Name)
|
||||
|
||||
|
|
@ -132,12 +133,12 @@ func (a *Actuator) deleteAsyncEmpty(nodeBuckets []*nodeBucket) (reportedSDNodes
|
|||
klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err)
|
||||
}
|
||||
|
||||
a.nodeDeletionTracker.StartDeletion(bucket.group.Id(), node.Name)
|
||||
a.nodeDeletionTracker.StartDeletion(bucket.Group.Id(), node.Name)
|
||||
}
|
||||
}
|
||||
|
||||
for _, bucket := range nodeBuckets {
|
||||
go a.deleteNodesAsync(bucket.nodes, bucket.group, false)
|
||||
for _, bucket := range NodeGroupViews {
|
||||
go a.deleteNodesAsync(bucket.Nodes, bucket.Group, false)
|
||||
}
|
||||
|
||||
return reportedSDNodes
|
||||
|
|
@ -145,10 +146,10 @@ func (a *Actuator) deleteAsyncEmpty(nodeBuckets []*nodeBucket) (reportedSDNodes
|
|||
|
||||
// taintNodesSync synchronously taints all provided nodes with NoSchedule. If tainting fails for any of the nodes, already
|
||||
// applied taints are cleaned up.
|
||||
func (a *Actuator) taintNodesSync(nodeBuckets []*nodeBucket) errors.AutoscalerError {
|
||||
func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) errors.AutoscalerError {
|
||||
var taintedNodes []*apiv1.Node
|
||||
for _, bucket := range nodeBuckets {
|
||||
for _, node := range bucket.nodes {
|
||||
for _, bucket := range NodeGroupViews {
|
||||
for _, node := range bucket.Nodes {
|
||||
err := a.taintNode(node)
|
||||
if err != nil {
|
||||
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
|
||||
|
|
@ -166,9 +167,9 @@ func (a *Actuator) taintNodesSync(nodeBuckets []*nodeBucket) errors.AutoscalerEr
|
|||
|
||||
// deleteAsyncDrain asynchronously starts deletions with drain for all provided nodes. scaledDownNodes return value contains all nodes for which
|
||||
// deletion successfully started.
|
||||
func (a *Actuator) deleteAsyncDrain(nodeBuckets []*nodeBucket) (reportedSDNodes []*status.ScaleDownNode) {
|
||||
for _, bucket := range nodeBuckets {
|
||||
for _, drainNode := range bucket.nodes {
|
||||
func (a *Actuator) deleteAsyncDrain(NodeGroupViews []*budgets.NodeGroupView) (reportedSDNodes []*status.ScaleDownNode) {
|
||||
for _, bucket := range NodeGroupViews {
|
||||
for _, drainNode := range bucket.Nodes {
|
||||
if sdNode, err := a.scaleDownNodeToReport(drainNode, true); err == nil {
|
||||
klog.V(0).Infof("Scale-down: removing node %s, utilization: %v, pods to reschedule: %s", drainNode.Name, sdNode.UtilInfo, joinPodNames(sdNode.EvictedPods))
|
||||
a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: removing node %s, utilization: %v, pods to reschedule: %s", drainNode.Name, sdNode.UtilInfo, joinPodNames(sdNode.EvictedPods))
|
||||
|
|
@ -177,12 +178,12 @@ func (a *Actuator) deleteAsyncDrain(nodeBuckets []*nodeBucket) (reportedSDNodes
|
|||
klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err)
|
||||
}
|
||||
|
||||
a.nodeDeletionTracker.StartDeletionWithDrain(bucket.group.Id(), drainNode.Name)
|
||||
a.nodeDeletionTracker.StartDeletionWithDrain(bucket.Group.Id(), drainNode.Name)
|
||||
}
|
||||
}
|
||||
|
||||
for _, bucket := range nodeBuckets {
|
||||
go a.deleteNodesAsync(bucket.nodes, bucket.group, true)
|
||||
for _, bucket := range NodeGroupViews {
|
||||
go a.deleteNodesAsync(bucket.Nodes, bucket.Group, true)
|
||||
}
|
||||
|
||||
return reportedSDNodes
|
||||
|
|
@ -206,7 +207,7 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
|
|||
klog.Errorf("Scale-down: couldn't create delete snapshot, err: %v", err)
|
||||
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "createSnapshot returned error %v", err)}
|
||||
for _, node := range nodes {
|
||||
a.nodeDeletionScheduler.RollbackNodeDeletion(node, nodeGroup.Id(), drain, "failed to create delete snapshot", nodeDeleteResult)
|
||||
a.nodeDeletionScheduler.AbortNodeDeletion(node, nodeGroup.Id(), drain, "failed to create delete snapshot", nodeDeleteResult)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
@ -217,7 +218,7 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
|
|||
klog.Errorf("Scale-down: couldn't fetch pod disruption budgets, err: %v", err)
|
||||
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "podDisruptionBudgetLister.List returned error %v", err)}
|
||||
for _, node := range nodes {
|
||||
a.nodeDeletionScheduler.RollbackNodeDeletion(node, nodeGroup.Id(), drain, "failed to fetch pod disruption budgets", nodeDeleteResult)
|
||||
a.nodeDeletionScheduler.AbortNodeDeletion(node, nodeGroup.Id(), drain, "failed to fetch pod disruption budgets", nodeDeleteResult)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
@ -230,7 +231,7 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
|
|||
if err != nil {
|
||||
klog.Errorf("Scale-down: can't retrieve node %q from snapshot, err: %v", node.Name, err)
|
||||
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "nodeInfos.Get for %q returned error: %v", node.Name, err)}
|
||||
a.nodeDeletionScheduler.RollbackNodeDeletion(node, nodeGroup.Id(), drain, "failed to get node info", nodeDeleteResult)
|
||||
a.nodeDeletionScheduler.AbortNodeDeletion(node, nodeGroup.Id(), drain, "failed to get node info", nodeDeleteResult)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -238,14 +239,14 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
|
|||
if err != nil {
|
||||
klog.Errorf("Scale-down: couldn't delete node %q, err: %v", node.Name, err)
|
||||
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "GetPodsToMove for %q returned error: %v", node.Name, err)}
|
||||
a.nodeDeletionScheduler.RollbackNodeDeletion(node, nodeGroup.Id(), drain, "failed to get pods to move on node", nodeDeleteResult)
|
||||
a.nodeDeletionScheduler.AbortNodeDeletion(node, nodeGroup.Id(), drain, "failed to get pods to move on node", nodeDeleteResult)
|
||||
continue
|
||||
}
|
||||
|
||||
if !drain && len(podsToRemove) != 0 {
|
||||
klog.Errorf("Scale-down: couldn't delete empty node %q, new pods got scheduled", node.Name)
|
||||
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "failed to delete empty node %q, new pods scheduled", node.Name)}
|
||||
a.nodeDeletionScheduler.RollbackNodeDeletion(node, nodeGroup.Id(), drain, "node is not empty", nodeDeleteResult)
|
||||
a.nodeDeletionScheduler.AbortNodeDeletion(node, nodeGroup.Id(), drain, "node is not empty", nodeDeleteResult)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ import (
|
|||
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
|
||||
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
|
||||
|
|
@ -56,8 +57,8 @@ func TestStartDeletion(t *testing.T) {
|
|||
toBeDeletedTaint := apiv1.Taint{Key: taints.ToBeDeletedTaint, Effect: apiv1.TaintEffectNoSchedule}
|
||||
|
||||
for tn, tc := range map[string]struct {
|
||||
emptyNodes []*nodeBucket
|
||||
drainNodes []*nodeBucket
|
||||
emptyNodes []*budgets.NodeGroupView
|
||||
drainNodes []*budgets.NodeGroupView
|
||||
pods map[string][]*apiv1.Pod
|
||||
failedPodDrain map[string]bool
|
||||
failedNodeDeletion map[string]bool
|
||||
|
|
@ -77,7 +78,7 @@ func TestStartDeletion(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"empty node deletion": {
|
||||
emptyNodes: generatenodeBucketList(testNg, 0, 2),
|
||||
emptyNodes: generateNodeGroupViewList(testNg, 0, 2),
|
||||
wantStatus: &status.ScaleDownStatus{
|
||||
Result: status.ScaleDownNodeDeleteStarted,
|
||||
ScaledDownNodes: []*status.ScaleDownNode{
|
||||
|
|
@ -110,7 +111,7 @@ func TestStartDeletion(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"empty atomic node deletion": {
|
||||
emptyNodes: generatenodeBucketList(atomic2, 0, 2),
|
||||
emptyNodes: generateNodeGroupViewList(atomic2, 0, 2),
|
||||
wantStatus: &status.ScaleDownStatus{
|
||||
Result: status.ScaleDownNodeDeleteStarted,
|
||||
ScaledDownNodes: []*status.ScaleDownNode{
|
||||
|
|
@ -143,7 +144,7 @@ func TestStartDeletion(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"deletion with drain": {
|
||||
drainNodes: generatenodeBucketList(testNg, 0, 2),
|
||||
drainNodes: generateNodeGroupViewList(testNg, 0, 2),
|
||||
pods: map[string][]*apiv1.Pod{
|
||||
"test-node-0": removablePods(2, "test-node-0"),
|
||||
"test-node-1": removablePods(2, "test-node-1"),
|
||||
|
|
@ -181,8 +182,8 @@ func TestStartDeletion(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"empty and drain deletion work correctly together": {
|
||||
emptyNodes: generatenodeBucketList(testNg, 0, 2),
|
||||
drainNodes: generatenodeBucketList(testNg, 2, 4),
|
||||
emptyNodes: generateNodeGroupViewList(testNg, 0, 2),
|
||||
drainNodes: generateNodeGroupViewList(testNg, 2, 4),
|
||||
pods: map[string][]*apiv1.Pod{
|
||||
"test-node-2": removablePods(2, "test-node-2"),
|
||||
"test-node-3": removablePods(2, "test-node-3"),
|
||||
|
|
@ -240,8 +241,8 @@ func TestStartDeletion(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"failure to taint empty node stops deletion and cleans already applied taints": {
|
||||
emptyNodes: generatenodeBucketList(testNg, 0, 4),
|
||||
drainNodes: generatenodeBucketList(testNg, 4, 5),
|
||||
emptyNodes: generateNodeGroupViewList(testNg, 0, 4),
|
||||
drainNodes: generateNodeGroupViewList(testNg, 4, 5),
|
||||
pods: map[string][]*apiv1.Pod{
|
||||
"test-node-4": removablePods(2, "test-node-4"),
|
||||
},
|
||||
|
|
@ -263,8 +264,8 @@ func TestStartDeletion(t *testing.T) {
|
|||
wantErr: cmpopts.AnyError,
|
||||
},
|
||||
"failure to taint empty atomic node stops deletion and cleans already applied taints": {
|
||||
emptyNodes: generatenodeBucketList(atomic4, 0, 4),
|
||||
drainNodes: generatenodeBucketList(testNg, 4, 5),
|
||||
emptyNodes: generateNodeGroupViewList(atomic4, 0, 4),
|
||||
drainNodes: generateNodeGroupViewList(testNg, 4, 5),
|
||||
pods: map[string][]*apiv1.Pod{
|
||||
"test-node-4": removablePods(2, "test-node-4"),
|
||||
},
|
||||
|
|
@ -286,8 +287,8 @@ func TestStartDeletion(t *testing.T) {
|
|||
wantErr: cmpopts.AnyError,
|
||||
},
|
||||
"failure to taint drain node stops further deletion and cleans already applied taints": {
|
||||
emptyNodes: generatenodeBucketList(testNg, 0, 2),
|
||||
drainNodes: generatenodeBucketList(testNg, 2, 6),
|
||||
emptyNodes: generateNodeGroupViewList(testNg, 0, 2),
|
||||
drainNodes: generateNodeGroupViewList(testNg, 2, 6),
|
||||
pods: map[string][]*apiv1.Pod{
|
||||
"test-node-2": removablePods(2, "test-node-2"),
|
||||
"test-node-3": removablePods(2, "test-node-3"),
|
||||
|
|
@ -328,8 +329,8 @@ func TestStartDeletion(t *testing.T) {
|
|||
wantErr: cmpopts.AnyError,
|
||||
},
|
||||
"failure to taint drain atomic node stops further deletion and cleans already applied taints": {
|
||||
emptyNodes: generatenodeBucketList(testNg, 0, 2),
|
||||
drainNodes: generatenodeBucketList(atomic4, 2, 6),
|
||||
emptyNodes: generateNodeGroupViewList(testNg, 0, 2),
|
||||
drainNodes: generateNodeGroupViewList(atomic4, 2, 6),
|
||||
pods: map[string][]*apiv1.Pod{
|
||||
"atomic-4-node-2": removablePods(2, "atomic-4-node-2"),
|
||||
"atomic-4-node-3": removablePods(2, "atomic-4-node-3"),
|
||||
|
|
@ -370,7 +371,7 @@ func TestStartDeletion(t *testing.T) {
|
|||
wantErr: cmpopts.AnyError,
|
||||
},
|
||||
"nodes that failed drain are correctly reported in results": {
|
||||
drainNodes: generatenodeBucketList(testNg, 0, 4),
|
||||
drainNodes: generateNodeGroupViewList(testNg, 0, 4),
|
||||
pods: map[string][]*apiv1.Pod{
|
||||
"test-node-0": removablePods(3, "test-node-0"),
|
||||
"test-node-1": removablePods(3, "test-node-1"),
|
||||
|
|
@ -458,8 +459,8 @@ func TestStartDeletion(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"nodes that failed deletion are correctly reported in results": {
|
||||
emptyNodes: generatenodeBucketList(testNg, 0, 2),
|
||||
drainNodes: generatenodeBucketList(testNg, 2, 4),
|
||||
emptyNodes: generateNodeGroupViewList(testNg, 0, 2),
|
||||
drainNodes: generateNodeGroupViewList(testNg, 2, 4),
|
||||
pods: map[string][]*apiv1.Pod{
|
||||
"test-node-2": removablePods(2, "test-node-2"),
|
||||
"test-node-3": removablePods(2, "test-node-3"),
|
||||
|
|
@ -526,7 +527,7 @@ func TestStartDeletion(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"DS pods are evicted from empty nodes, but don't block deletion on error": {
|
||||
emptyNodes: generatenodeBucketList(testNg, 0, 2),
|
||||
emptyNodes: generateNodeGroupViewList(testNg, 0, 2),
|
||||
pods: map[string][]*apiv1.Pod{
|
||||
"test-node-0": {generateDsPod("test-node-0-ds-pod-0", "test-node-0"), generateDsPod("test-node-0-ds-pod-1", "test-node-0")},
|
||||
"test-node-1": {generateDsPod("test-node-1-ds-pod-0", "test-node-1"), generateDsPod("test-node-1-ds-pod-1", "test-node-1")},
|
||||
|
|
@ -565,7 +566,7 @@ func TestStartDeletion(t *testing.T) {
|
|||
},
|
||||
},
|
||||
"nodes with pods are not deleted if the node is passed as empty": {
|
||||
emptyNodes: generatenodeBucketList(testNg, 0, 2),
|
||||
emptyNodes: generateNodeGroupViewList(testNg, 0, 2),
|
||||
pods: map[string][]*apiv1.Pod{
|
||||
"test-node-0": removablePods(2, "test-node-0"),
|
||||
"test-node-1": removablePods(2, "test-node-1"),
|
||||
|
|
@ -606,8 +607,8 @@ func TestStartDeletion(t *testing.T) {
|
|||
},
|
||||
"atomic nodes with pods are not deleted if the node is passed as empty": {
|
||||
emptyNodes: append(
|
||||
generatenodeBucketList(testNg, 0, 2),
|
||||
generatenodeBucketList(atomic2, 0, 2)...,
|
||||
generateNodeGroupViewList(testNg, 0, 2),
|
||||
generateNodeGroupViewList(atomic2, 0, 2)...,
|
||||
),
|
||||
pods: map[string][]*apiv1.Pod{
|
||||
"test-node-1": removablePods(2, "test-node-1"),
|
||||
|
|
@ -678,14 +679,14 @@ func TestStartDeletion(t *testing.T) {
|
|||
nodesByName := make(map[string]*apiv1.Node)
|
||||
nodesLock := sync.Mutex{}
|
||||
for _, bucket := range tc.emptyNodes {
|
||||
allEmptyNodes = append(allEmptyNodes, bucket.nodes...)
|
||||
allEmptyNodes = append(allEmptyNodes, bucket.Nodes...)
|
||||
for _, node := range allEmptyNodes {
|
||||
nodesByName[node.Name] = node
|
||||
}
|
||||
}
|
||||
for _, bucket := range tc.drainNodes {
|
||||
allDrainNodes = append(allDrainNodes, bucket.nodes...)
|
||||
for _, node := range bucket.nodes {
|
||||
allDrainNodes = append(allDrainNodes, bucket.Nodes...)
|
||||
for _, node := range bucket.Nodes {
|
||||
nodesByName[node.Name] = node
|
||||
}
|
||||
}
|
||||
|
|
@ -765,17 +766,17 @@ func TestStartDeletion(t *testing.T) {
|
|||
return nil
|
||||
})
|
||||
for _, bucket := range tc.emptyNodes {
|
||||
bucket.group.(*testprovider.TestNodeGroup).SetCloudProvider(provider)
|
||||
provider.InsertNodeGroup(bucket.group)
|
||||
for _, node := range bucket.nodes {
|
||||
provider.AddNode(bucket.group.Id(), node)
|
||||
bucket.Group.(*testprovider.TestNodeGroup).SetCloudProvider(provider)
|
||||
provider.InsertNodeGroup(bucket.Group)
|
||||
for _, node := range bucket.Nodes {
|
||||
provider.AddNode(bucket.Group.Id(), node)
|
||||
}
|
||||
}
|
||||
for _, bucket := range tc.drainNodes {
|
||||
bucket.group.(*testprovider.TestNodeGroup).SetCloudProvider(provider)
|
||||
provider.InsertNodeGroup(bucket.group)
|
||||
for _, node := range bucket.nodes {
|
||||
provider.AddNode(bucket.group.Id(), node)
|
||||
bucket.Group.(*testprovider.TestNodeGroup).SetCloudProvider(provider)
|
||||
provider.InsertNodeGroup(bucket.Group)
|
||||
for _, node := range bucket.Nodes {
|
||||
provider.AddNode(bucket.Group.Id(), node)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -807,7 +808,7 @@ func TestStartDeletion(t *testing.T) {
|
|||
}
|
||||
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), clusterstate.NewStaticMaxNodeProvisionTimeProvider(15*time.Minute))
|
||||
for _, bucket := range tc.emptyNodes {
|
||||
for _, node := range bucket.nodes {
|
||||
for _, node := range bucket.Nodes {
|
||||
err := ctx.ClusterSnapshot.AddNodeWithPods(node, tc.pods[node.Name])
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
|
||||
|
|
@ -815,7 +816,7 @@ func TestStartDeletion(t *testing.T) {
|
|||
}
|
||||
}
|
||||
for _, bucket := range tc.drainNodes {
|
||||
for _, node := range bucket.nodes {
|
||||
for _, node := range bucket.Nodes {
|
||||
pods, found := tc.pods[node.Name]
|
||||
if !found {
|
||||
t.Fatalf("Drain node %q doesn't have pods defined in the test case.", node.Name)
|
||||
|
|
@ -834,7 +835,7 @@ func TestStartDeletion(t *testing.T) {
|
|||
actuator := Actuator{
|
||||
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
|
||||
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, simulator.NodeDeleteOptions{}, evictor),
|
||||
budgetProcessor: NewScaleDownBudgetProcessor(&ctx, ndt),
|
||||
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx, ndt),
|
||||
}
|
||||
gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
|
||||
if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
|
||||
|
|
@ -1039,11 +1040,11 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
|
|||
provider.InsertNodeGroup(ng)
|
||||
ng.SetCloudProvider(provider)
|
||||
for i, num := range numNodes {
|
||||
singleBucketList := generatenodeBucketList(ng, 0, num)
|
||||
singleBucketList := generateNodeGroupViewList(ng, 0, num)
|
||||
bucket := singleBucketList[0]
|
||||
deleteNodes[i] = append(deleteNodes[i], bucket.nodes...)
|
||||
for _, node := range bucket.nodes {
|
||||
provider.AddNode(bucket.group.Id(), node)
|
||||
deleteNodes[i] = append(deleteNodes[i], bucket.Nodes...)
|
||||
for _, node := range bucket.Nodes {
|
||||
provider.AddNode(bucket.Group.Id(), node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1068,7 +1069,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
|
|||
actuator := Actuator{
|
||||
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
|
||||
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, simulator.NodeDeleteOptions{}, evictor),
|
||||
budgetProcessor: NewScaleDownBudgetProcessor(&ctx, ndt),
|
||||
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx, ndt),
|
||||
}
|
||||
|
||||
for _, nodes := range deleteNodes {
|
||||
|
|
@ -1120,11 +1121,11 @@ func generateNodes(from, to int, prefix string) []*apiv1.Node {
|
|||
return result
|
||||
}
|
||||
|
||||
func generatenodeBucketList(ng cloudprovider.NodeGroup, from, to int) []*nodeBucket {
|
||||
return []*nodeBucket{
|
||||
func generateNodeGroupViewList(ng cloudprovider.NodeGroup, from, to int) []*budgets.NodeGroupView {
|
||||
return []*budgets.NodeGroupView{
|
||||
{
|
||||
group: ng,
|
||||
nodes: generateNodes(from, to, ng.Id()),
|
||||
Group: ng,
|
||||
Nodes: generateNodes(from, to, ng.Id()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,154 +0,0 @@
|
|||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package actuation
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
|
||||
)
|
||||
|
||||
type nodeBucket struct {
|
||||
group cloudprovider.NodeGroup
|
||||
nodes []*apiv1.Node
|
||||
}
|
||||
|
||||
// ScaleDownBudgetProcessor is responsible for keeping the number of nodes deleted in parallel within defined limits.
|
||||
type ScaleDownBudgetProcessor struct {
|
||||
ctx *context.AutoscalingContext
|
||||
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
|
||||
}
|
||||
|
||||
// NewScaleDownBudgetProcessor creates a ScaleDownBudgetProcessor instance.
|
||||
func NewScaleDownBudgetProcessor(ctx *context.AutoscalingContext, ndt *deletiontracker.NodeDeletionTracker) *ScaleDownBudgetProcessor {
|
||||
return &ScaleDownBudgetProcessor{
|
||||
ctx: ctx,
|
||||
nodeDeletionTracker: ndt,
|
||||
}
|
||||
}
|
||||
|
||||
// CropNodes crops the provided node lists to respect scale-down max parallelism budgets.
|
||||
// The returned nodes are grouped by a node group.
|
||||
func (bp *ScaleDownBudgetProcessor) CropNodes(empty, drain []*apiv1.Node) (emptyToDelete, drainToDelete []*nodeBucket) {
|
||||
emptyIndividual, emptyAtomic := bp.groupByNodeGroup(empty)
|
||||
drainIndividual, drainAtomic := bp.groupByNodeGroup(drain)
|
||||
|
||||
emptyInProgress, drainInProgress := bp.nodeDeletionTracker.DeletionsInProgress()
|
||||
parallelismBudget := bp.ctx.MaxScaleDownParallelism - len(emptyInProgress) - len(drainInProgress)
|
||||
drainBudget := bp.ctx.MaxDrainParallelism - len(drainInProgress)
|
||||
|
||||
emptyToDelete = []*nodeBucket{}
|
||||
for _, bucket := range emptyAtomic {
|
||||
if parallelismBudget < len(bucket.nodes) {
|
||||
// One pod slice can sneak in even if it would exceed parallelism budget.
|
||||
// This is to help avoid starvation of pod slices by regular nodes,
|
||||
// also larger pod slices will immediately exceed parallelism budget.
|
||||
if parallelismBudget == 0 || len(emptyToDelete) > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
emptyToDelete = append(emptyToDelete, bucket)
|
||||
parallelismBudget -= len(bucket.nodes)
|
||||
}
|
||||
|
||||
drainBudget = min(parallelismBudget, drainBudget)
|
||||
|
||||
drainToDelete = []*nodeBucket{}
|
||||
for _, bucket := range drainAtomic {
|
||||
if drainBudget < len(bucket.nodes) {
|
||||
// One pod slice can sneak in even if it would exceed parallelism budget.
|
||||
// This is to help avoid starvation of pod slices by regular nodes,
|
||||
// also larger pod slices will immediately exceed parallelism budget.
|
||||
if drainBudget == 0 || len(emptyToDelete) > 0 || len(drainToDelete) > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
drainToDelete = append(drainToDelete, bucket)
|
||||
drainBudget -= len(bucket.nodes)
|
||||
parallelismBudget -= len(bucket.nodes)
|
||||
}
|
||||
|
||||
for _, bucket := range emptyIndividual {
|
||||
if parallelismBudget < 1 {
|
||||
break
|
||||
}
|
||||
if parallelismBudget < len(bucket.nodes) {
|
||||
bucket.nodes = bucket.nodes[:parallelismBudget]
|
||||
}
|
||||
emptyToDelete = append(emptyToDelete, bucket)
|
||||
parallelismBudget -= len(bucket.nodes)
|
||||
}
|
||||
|
||||
drainBudget = min(parallelismBudget, drainBudget)
|
||||
|
||||
for _, bucket := range drainIndividual {
|
||||
if drainBudget < 1 {
|
||||
break
|
||||
}
|
||||
if drainBudget < len(bucket.nodes) {
|
||||
bucket.nodes = bucket.nodes[:drainBudget]
|
||||
}
|
||||
drainToDelete = append(drainToDelete, bucket)
|
||||
drainBudget -= 1
|
||||
}
|
||||
|
||||
return emptyToDelete, drainToDelete
|
||||
}
|
||||
|
||||
func (bp *ScaleDownBudgetProcessor) groupByNodeGroup(nodes []*apiv1.Node) (individual, atomic []*nodeBucket) {
|
||||
individualGroup, atomicGroup := map[cloudprovider.NodeGroup]int{}, map[cloudprovider.NodeGroup]int{}
|
||||
individual, atomic = []*nodeBucket{}, []*nodeBucket{}
|
||||
for _, node := range nodes {
|
||||
nodeGroup, err := bp.ctx.CloudProvider.NodeGroupForNode(node)
|
||||
if err != nil || nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
|
||||
klog.Errorf("Failed to find node group for %s: %v", node.Name, err)
|
||||
continue
|
||||
}
|
||||
autoscalingOptions, err := nodeGroup.GetOptions(bp.ctx.NodeGroupDefaults)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err)
|
||||
continue
|
||||
}
|
||||
if autoscalingOptions != nil && autoscalingOptions.AtomicScaleDown {
|
||||
if idx, ok := atomicGroup[nodeGroup]; ok {
|
||||
atomic[idx].nodes = append(atomic[idx].nodes, node)
|
||||
} else {
|
||||
atomicGroup[nodeGroup] = len(atomic)
|
||||
atomic = append(atomic, &nodeBucket{
|
||||
group: nodeGroup,
|
||||
nodes: []*apiv1.Node{node},
|
||||
})
|
||||
}
|
||||
} else {
|
||||
if idx, ok := individualGroup[nodeGroup]; ok {
|
||||
individual[idx].nodes = append(individual[idx].nodes, node)
|
||||
} else {
|
||||
individualGroup[nodeGroup] = len(individual)
|
||||
individual = append(individual, &nodeBucket{
|
||||
group: nodeGroup,
|
||||
nodes: []*apiv1.Node{node},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return individual, atomic
|
||||
}
|
||||
|
|
@ -1,352 +0,0 @@
|
|||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package actuation
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
|
||||
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
|
||||
)
|
||||
|
||||
func TestCropNodesToBudgets(t *testing.T) {
|
||||
testNg := testprovider.NewTestNodeGroup("test-ng", 0, 100, 3, true, false, "n1-standard-2", nil, nil)
|
||||
atomic3 := sizedNodeGroup("atomic-3", 3, true)
|
||||
atomic4 := sizedNodeGroup("atomic-4", 4, true)
|
||||
atomic8 := sizedNodeGroup("atomic-8", 8, true)
|
||||
atomic11 := sizedNodeGroup("atomic-11", 11, true)
|
||||
for tn, tc := range map[string]struct {
|
||||
emptyDeletionsInProgress int
|
||||
drainDeletionsInProgress int
|
||||
empty []*nodeBucket
|
||||
drain []*nodeBucket
|
||||
wantEmpty []*nodeBucket
|
||||
wantDrain []*nodeBucket
|
||||
}{
|
||||
"no nodes": {
|
||||
empty: []*nodeBucket{},
|
||||
drain: []*nodeBucket{},
|
||||
wantEmpty: []*nodeBucket{},
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
// Empty nodes only.
|
||||
"empty nodes within max limit, no deletions in progress": {
|
||||
empty: generatenodeBucketList(testNg, 0, 10),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 10),
|
||||
},
|
||||
"empty nodes exceeding max limit, no deletions in progress": {
|
||||
empty: generatenodeBucketList(testNg, 0, 11),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 10),
|
||||
},
|
||||
"empty atomic node group exceeding max limit": {
|
||||
empty: generatenodeBucketList(atomic11, 0, 11),
|
||||
wantEmpty: generatenodeBucketList(atomic11, 0, 11),
|
||||
},
|
||||
"empty regular and atomic": {
|
||||
empty: append(generatenodeBucketList(testNg, 0, 8), generatenodeBucketList(atomic3, 0, 3)...),
|
||||
wantEmpty: append(generatenodeBucketList(atomic3, 0, 3), generatenodeBucketList(testNg, 0, 7)...),
|
||||
},
|
||||
"multiple empty atomic": {
|
||||
empty: append(
|
||||
append(
|
||||
generatenodeBucketList(testNg, 0, 3),
|
||||
generatenodeBucketList(atomic8, 0, 8)...),
|
||||
generatenodeBucketList(atomic3, 0, 3)...),
|
||||
wantEmpty: append(generatenodeBucketList(atomic8, 0, 8), generatenodeBucketList(testNg, 0, 2)...),
|
||||
},
|
||||
"empty nodes with deletions in progress, within budget": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 1,
|
||||
empty: generatenodeBucketList(testNg, 0, 8),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 8),
|
||||
},
|
||||
"empty nodes with deletions in progress, exceeding budget": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 1,
|
||||
empty: generatenodeBucketList(testNg, 0, 10),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 8),
|
||||
},
|
||||
"empty atomic nodes with deletions in progress, exceeding budget": {
|
||||
emptyDeletionsInProgress: 3,
|
||||
drainDeletionsInProgress: 3,
|
||||
empty: generatenodeBucketList(atomic8, 0, 8),
|
||||
wantEmpty: generatenodeBucketList(atomic8, 0, 8),
|
||||
},
|
||||
"empty nodes with deletions in progress, 0 budget left": {
|
||||
emptyDeletionsInProgress: 5,
|
||||
drainDeletionsInProgress: 5,
|
||||
empty: generatenodeBucketList(testNg, 0, 10),
|
||||
wantEmpty: []*nodeBucket{},
|
||||
},
|
||||
"empty atomic nodes with deletions in progress, 0 budget left": {
|
||||
emptyDeletionsInProgress: 5,
|
||||
drainDeletionsInProgress: 5,
|
||||
empty: generatenodeBucketList(atomic3, 0, 3),
|
||||
wantEmpty: []*nodeBucket{},
|
||||
},
|
||||
"empty nodes with deletions in progress, budget exceeded": {
|
||||
emptyDeletionsInProgress: 50,
|
||||
drainDeletionsInProgress: 50,
|
||||
empty: generatenodeBucketList(testNg, 0, 10),
|
||||
wantEmpty: []*nodeBucket{},
|
||||
},
|
||||
// Drain nodes only.
|
||||
"drain nodes within max limit, no deletions in progress": {
|
||||
drain: generatenodeBucketList(testNg, 0, 5),
|
||||
wantDrain: generatenodeBucketList(testNg, 0, 5),
|
||||
},
|
||||
"drain nodes exceeding max limit, no deletions in progress": {
|
||||
drain: generatenodeBucketList(testNg, 0, 6),
|
||||
wantDrain: generatenodeBucketList(testNg, 0, 5),
|
||||
},
|
||||
"drain atomic exceeding limit": {
|
||||
drain: generatenodeBucketList(atomic8, 0, 8),
|
||||
wantDrain: generatenodeBucketList(atomic8, 0, 8),
|
||||
},
|
||||
"drain regular and atomic exceeding limit": {
|
||||
drain: append(generatenodeBucketList(testNg, 0, 3), generatenodeBucketList(atomic3, 0, 3)...),
|
||||
wantDrain: append(generatenodeBucketList(atomic3, 0, 3), generatenodeBucketList(testNg, 0, 2)...),
|
||||
},
|
||||
"multiple drain atomic": {
|
||||
drain: append(
|
||||
append(
|
||||
generatenodeBucketList(testNg, 0, 3),
|
||||
generatenodeBucketList(atomic3, 0, 3)...),
|
||||
generatenodeBucketList(atomic4, 0, 4)...),
|
||||
wantDrain: append(generatenodeBucketList(atomic3, 0, 3), generatenodeBucketList(testNg, 0, 2)...),
|
||||
},
|
||||
"drain nodes with deletions in progress, within budget": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 2,
|
||||
drain: generatenodeBucketList(testNg, 0, 3),
|
||||
wantDrain: generatenodeBucketList(testNg, 0, 3),
|
||||
},
|
||||
"drain nodes with deletions in progress, exceeding drain budget": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 2,
|
||||
drain: generatenodeBucketList(testNg, 0, 5),
|
||||
wantDrain: generatenodeBucketList(testNg, 0, 3),
|
||||
},
|
||||
"drain atomic nodes with deletions in progress, exceeding drain budget": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 2,
|
||||
drain: generatenodeBucketList(atomic4, 0, 4),
|
||||
wantDrain: generatenodeBucketList(atomic4, 0, 4),
|
||||
},
|
||||
"drain nodes with deletions in progress, 0 drain budget left": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 5,
|
||||
drain: generatenodeBucketList(testNg, 0, 5),
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
"drain atomic nodes with deletions in progress, 0 drain budget left": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 5,
|
||||
drain: generatenodeBucketList(atomic4, 0, 4),
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
"drain nodes with deletions in progress, drain budget exceeded": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 50,
|
||||
drain: generatenodeBucketList(testNg, 0, 5),
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
"drain nodes with deletions in progress, exceeding overall budget": {
|
||||
emptyDeletionsInProgress: 7,
|
||||
drainDeletionsInProgress: 1,
|
||||
drain: generatenodeBucketList(testNg, 0, 4),
|
||||
wantDrain: generatenodeBucketList(testNg, 0, 2),
|
||||
},
|
||||
"drain nodes with deletions in progress, 0 overall budget left": {
|
||||
emptyDeletionsInProgress: 10,
|
||||
drain: generatenodeBucketList(testNg, 0, 4),
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
"drain nodes with deletions in progress, overall budget exceeded": {
|
||||
emptyDeletionsInProgress: 50,
|
||||
drain: generatenodeBucketList(testNg, 0, 4),
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
// Empty and drain nodes together.
|
||||
"empty&drain nodes within max limits, no deletions in progress": {
|
||||
empty: generatenodeBucketList(testNg, 0, 5),
|
||||
drain: generatenodeBucketList(testNg, 0, 5),
|
||||
wantDrain: generatenodeBucketList(testNg, 0, 5),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 5),
|
||||
},
|
||||
"empty&drain atomic nodes within max limits, no deletions in progress": {
|
||||
empty: generatenodeBucketList(atomic3, 0, 3),
|
||||
drain: generatenodeBucketList(atomic4, 0, 4),
|
||||
wantEmpty: generatenodeBucketList(atomic3, 0, 3),
|
||||
wantDrain: generatenodeBucketList(atomic4, 0, 4),
|
||||
},
|
||||
"empty&drain nodes exceeding overall limit, no deletions in progress": {
|
||||
empty: generatenodeBucketList(testNg, 0, 8),
|
||||
drain: generatenodeBucketList(testNg, 0, 8),
|
||||
wantDrain: generatenodeBucketList(testNg, 0, 2),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 8),
|
||||
},
|
||||
"empty&drain atomic nodes exceeding overall limit, no deletions in progress": {
|
||||
empty: generatenodeBucketList(atomic8, 0, 8),
|
||||
drain: generatenodeBucketList(atomic4, 0, 4),
|
||||
wantEmpty: generatenodeBucketList(atomic8, 0, 8),
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
"empty&drain atomic nodes exceeding drain limit, no deletions in progress": {
|
||||
empty: generatenodeBucketList(atomic4, 0, 4),
|
||||
drain: generatenodeBucketList(atomic8, 0, 8),
|
||||
wantEmpty: generatenodeBucketList(atomic4, 0, 4),
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
"empty&drain atomic and regular nodes exceeding drain limit, no deletions in progress": {
|
||||
empty: append(generatenodeBucketList(testNg, 0, 5), generatenodeBucketList(atomic3, 0, 3)...),
|
||||
drain: generatenodeBucketList(atomic8, 0, 8),
|
||||
wantEmpty: append(generatenodeBucketList(atomic3, 0, 3), generatenodeBucketList(testNg, 0, 5)...),
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
"empty regular and drain atomic nodes exceeding overall limit, no deletions in progress": {
|
||||
drain: generatenodeBucketList(atomic8, 0, 8),
|
||||
empty: generatenodeBucketList(testNg, 0, 5),
|
||||
wantDrain: generatenodeBucketList(atomic8, 0, 8),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 2),
|
||||
},
|
||||
"empty&drain nodes exceeding drain limit, no deletions in progress": {
|
||||
empty: generatenodeBucketList(testNg, 0, 2),
|
||||
drain: generatenodeBucketList(testNg, 0, 8),
|
||||
wantDrain: generatenodeBucketList(testNg, 0, 5),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 2),
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, 0 overall budget left": {
|
||||
emptyDeletionsInProgress: 10,
|
||||
empty: generatenodeBucketList(testNg, 0, 5),
|
||||
drain: generatenodeBucketList(testNg, 0, 5),
|
||||
wantEmpty: []*nodeBucket{},
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, overall budget exceeded (shouldn't happen, just a sanity check)": {
|
||||
emptyDeletionsInProgress: 50,
|
||||
empty: generatenodeBucketList(testNg, 0, 5),
|
||||
drain: generatenodeBucketList(testNg, 0, 5),
|
||||
wantEmpty: []*nodeBucket{},
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, 0 drain budget left": {
|
||||
drainDeletionsInProgress: 5,
|
||||
empty: generatenodeBucketList(testNg, 0, 5),
|
||||
drain: generatenodeBucketList(testNg, 0, 5),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 5),
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, drain budget exceeded (shouldn't happen, just a sanity check)": {
|
||||
drainDeletionsInProgress: 9,
|
||||
empty: generatenodeBucketList(testNg, 0, 5),
|
||||
drain: generatenodeBucketList(testNg, 0, 5),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 1),
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, overall budget exceeded, only empty nodes fit": {
|
||||
emptyDeletionsInProgress: 5,
|
||||
drainDeletionsInProgress: 3,
|
||||
empty: generatenodeBucketList(testNg, 0, 5),
|
||||
drain: generatenodeBucketList(testNg, 0, 2),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 2),
|
||||
wantDrain: []*nodeBucket{},
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, overall budget exceeded, both empty&drain nodes fit": {
|
||||
emptyDeletionsInProgress: 5,
|
||||
drainDeletionsInProgress: 3,
|
||||
empty: generatenodeBucketList(testNg, 0, 1),
|
||||
drain: generatenodeBucketList(testNg, 0, 2),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 1),
|
||||
wantDrain: generatenodeBucketList(testNg, 0, 1),
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, drain budget exceeded": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 3,
|
||||
empty: generatenodeBucketList(testNg, 0, 4),
|
||||
drain: generatenodeBucketList(testNg, 0, 5),
|
||||
wantEmpty: generatenodeBucketList(testNg, 0, 4),
|
||||
wantDrain: generatenodeBucketList(testNg, 0, 2),
|
||||
},
|
||||
} {
|
||||
t.Run(tn, func(t *testing.T) {
|
||||
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
|
||||
return nil
|
||||
})
|
||||
for _, bucket := range append(tc.empty, tc.drain...) {
|
||||
bucket.group.(*testprovider.TestNodeGroup).SetCloudProvider(provider)
|
||||
provider.InsertNodeGroup(bucket.group)
|
||||
for _, node := range bucket.nodes {
|
||||
provider.AddNode(bucket.group.Id(), node)
|
||||
}
|
||||
}
|
||||
|
||||
ctx := &context.AutoscalingContext{
|
||||
AutoscalingOptions: config.AutoscalingOptions{
|
||||
MaxScaleDownParallelism: 10,
|
||||
MaxDrainParallelism: 5,
|
||||
NodeDeletionBatcherInterval: 0 * time.Second,
|
||||
NodeDeleteDelayAfterTaint: 1 * time.Second,
|
||||
},
|
||||
CloudProvider: provider,
|
||||
}
|
||||
ndt := deletiontracker.NewNodeDeletionTracker(1 * time.Hour)
|
||||
for i := 0; i < tc.emptyDeletionsInProgress; i++ {
|
||||
ndt.StartDeletion("ng1", fmt.Sprintf("empty-node-%d", i))
|
||||
}
|
||||
for i := 0; i < tc.drainDeletionsInProgress; i++ {
|
||||
ndt.StartDeletionWithDrain("ng2", fmt.Sprintf("drain-node-%d", i))
|
||||
}
|
||||
emptyList, drainList := []*apiv1.Node{}, []*apiv1.Node{}
|
||||
for _, bucket := range tc.empty {
|
||||
emptyList = append(emptyList, bucket.nodes...)
|
||||
}
|
||||
for _, bucket := range tc.drain {
|
||||
drainList = append(drainList, bucket.nodes...)
|
||||
}
|
||||
|
||||
budgeter := NewScaleDownBudgetProcessor(ctx, ndt)
|
||||
gotEmpty, gotDrain := budgeter.CropNodes(emptyList, drainList)
|
||||
// a
|
||||
if diff := cmp.Diff(tc.wantEmpty, gotEmpty, cmpopts.EquateEmpty(), transformnodeBucket); diff != "" {
|
||||
t.Errorf("cropNodesToBudgets empty nodes diff (-want +got):\n%s", diff)
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantDrain, gotDrain, cmpopts.EquateEmpty(), transformnodeBucket); diff != "" {
|
||||
t.Errorf("cropNodesToBudgets drain nodes diff (-want +got):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// transformnodeBucket transforms a nodeBucket to a structure that can be directly compared with other node bucket.
|
||||
var transformnodeBucket = cmp.Transformer("transformnodeBucket", func(b nodeBucket) interface{} {
|
||||
return struct {
|
||||
Group string
|
||||
Nodes []*apiv1.Node
|
||||
}{
|
||||
Group: b.group.Id(),
|
||||
Nodes: b.nodes,
|
||||
}
|
||||
})
|
||||
|
|
@ -86,7 +86,7 @@ func (d *NodeDeletionBatcher) AddNodes(nodes []*apiv1.Node, nodeGroup cloudprovi
|
|||
if first {
|
||||
go func(nodeGroup cloudprovider.NodeGroup) {
|
||||
time.Sleep(d.deleteInterval)
|
||||
d.executeForBucket(nodeGroup)
|
||||
d.remove(nodeGroup)
|
||||
}(nodeGroup)
|
||||
}
|
||||
}
|
||||
|
|
@ -107,8 +107,8 @@ func (d *NodeDeletionBatcher) addNodesToBucket(nodes []*apiv1.Node, nodeGroup cl
|
|||
return false
|
||||
}
|
||||
|
||||
// executeForBucket deletes nodes of a given nodeGroup, if successful, the deletion is recorded in CSR, and an event is emitted on the node.
|
||||
func (d *NodeDeletionBatcher) executeForBucket(nodeGroup cloudprovider.NodeGroup) error {
|
||||
// remove deletes nodes of a given nodeGroup, if successful, the deletion is recorded in CSR, and an event is emitted on the node.
|
||||
func (d *NodeDeletionBatcher) remove(nodeGroup cloudprovider.NodeGroup) error {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
nodes, ok := d.deletionsPerNodeGroup[nodeGroup.Id()]
|
||||
|
|
|
|||
|
|
@ -197,7 +197,7 @@ func TestRemove(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
err = d.executeForBucket(nodeGroup)
|
||||
err = d.remove(nodeGroup)
|
||||
if test.err {
|
||||
if err == nil {
|
||||
t.Errorf("remove() should return error, but return nil")
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ func (ds *GroupDeletionScheduler) ScheduleDeletion(nodeInfo *framework.NodeInfo,
|
|||
opts, err := nodeGroup.GetOptions(ds.ctx.NodeGroupDefaults)
|
||||
if err != nil {
|
||||
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "GetOptions returned error %v", err)}
|
||||
ds.RollbackNodeDeletion(nodeInfo.Node(), nodeGroup.Id(), drain, "failed to get autoscaling options for a node group", nodeDeleteResult)
|
||||
ds.AbortNodeDeletion(nodeInfo.Node(), nodeGroup.Id(), drain, "failed to get autoscaling options for a node group", nodeDeleteResult)
|
||||
return
|
||||
}
|
||||
if opts == nil {
|
||||
|
|
@ -72,7 +72,7 @@ func (ds *GroupDeletionScheduler) ScheduleDeletion(nodeInfo *framework.NodeInfo,
|
|||
|
||||
nodeDeleteResult := ds.prepareNodeForDeletion(nodeInfo, drain)
|
||||
if nodeDeleteResult.Err != nil {
|
||||
ds.RollbackNodeDeletion(nodeInfo.Node(), nodeGroup.Id(), drain, "prepareNodeForDeletion failed", nodeDeleteResult)
|
||||
ds.AbortNodeDeletion(nodeInfo.Node(), nodeGroup.Id(), drain, "prepareNodeForDeletion failed", nodeDeleteResult)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -117,8 +117,8 @@ func (ds *GroupDeletionScheduler) addToBatcher(nodeInfo *framework.NodeInfo, nod
|
|||
ds.nodeQueue[nodeGroup.Id()] = []*apiv1.Node{}
|
||||
}
|
||||
|
||||
// RollbackNodeDeletion frees up a node that couldn't be deleted successfully. If it was a part of a group, the same is applied for other nodes queued for deletion.
|
||||
func (ds *GroupDeletionScheduler) RollbackNodeDeletion(node *apiv1.Node, nodeGroupId string, drain bool, errMsg string, result status.NodeDeleteResult) {
|
||||
// AbortNodeDeletion frees up a node that couldn't be deleted successfully. If it was a part of a group, the same is applied for other nodes queued for deletion.
|
||||
func (ds *GroupDeletionScheduler) AbortNodeDeletion(node *apiv1.Node, nodeGroupId string, drain bool, errMsg string, result status.NodeDeleteResult) {
|
||||
ds.Lock()
|
||||
defer ds.Unlock()
|
||||
ds.failuresForGroup[nodeGroupId] = true
|
||||
|
|
|
|||
|
|
@ -0,0 +1,202 @@
|
|||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package budgets
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
|
||||
)
|
||||
|
||||
// NodeGroupView is a subset of nodes from a given NodeGroup
|
||||
type NodeGroupView struct {
|
||||
Group cloudprovider.NodeGroup
|
||||
Nodes []*apiv1.Node
|
||||
}
|
||||
|
||||
// ScaleDownBudgetProcessor is responsible for keeping the number of nodes deleted in parallel within defined limits.
|
||||
type ScaleDownBudgetProcessor struct {
|
||||
ctx *context.AutoscalingContext
|
||||
actuationStatus scaledown.ActuationStatus
|
||||
}
|
||||
|
||||
// NewScaleDownBudgetProcessor creates a ScaleDownBudgetProcessor instance.
|
||||
func NewScaleDownBudgetProcessor(ctx *context.AutoscalingContext, as scaledown.ActuationStatus) *ScaleDownBudgetProcessor {
|
||||
return &ScaleDownBudgetProcessor{
|
||||
ctx: ctx,
|
||||
actuationStatus: as,
|
||||
}
|
||||
}
|
||||
|
||||
// CropNodes crops the provided node lists to respect scale-down max parallelism budgets.
|
||||
// The returned nodes are grouped by a node group.
|
||||
func (bp *ScaleDownBudgetProcessor) CropNodes(empty, drain []*apiv1.Node) (emptyToDelete, drainToDelete []*NodeGroupView) {
|
||||
emptyIndividual, emptyAtomic := bp.categorize(bp.group(empty))
|
||||
drainIndividual, drainAtomic := bp.categorize(bp.group(drain))
|
||||
//emptyIndividual, emptyAtomic := bp.groupByNodeGroup(empty)
|
||||
//drainIndividual, drainAtomic := bp.groupByNodeGroup(drain)
|
||||
|
||||
emptyInProgress, drainInProgress := bp.actuationStatus.DeletionsInProgress()
|
||||
parallelismBudget := bp.ctx.MaxScaleDownParallelism - len(emptyInProgress) - len(drainInProgress)
|
||||
drainBudget := bp.ctx.MaxDrainParallelism - len(drainInProgress)
|
||||
|
||||
emptyToDelete, allowedCount, canOverflow := cropAtomicNodes(emptyAtomic, parallelismBudget, true)
|
||||
parallelismBudget -= allowedCount
|
||||
|
||||
drainBudget = min(parallelismBudget, drainBudget)
|
||||
drainToDelete, allowedCount, _ = cropAtomicNodes(drainAtomic, drainBudget, canOverflow)
|
||||
parallelismBudget -= allowedCount
|
||||
drainBudget -= allowedCount
|
||||
|
||||
emptyToDelete, allowedCount = cropIndividualNodes(emptyToDelete, emptyIndividual, parallelismBudget)
|
||||
parallelismBudget -= allowedCount
|
||||
drainBudget = min(parallelismBudget, drainBudget)
|
||||
|
||||
drainToDelete, _ = cropIndividualNodes(drainToDelete, drainIndividual, drainBudget)
|
||||
|
||||
return emptyToDelete, drainToDelete
|
||||
}
|
||||
|
||||
// cropAtomicNodes returns three values:
|
||||
// * nodes selected for deletion
|
||||
// * the number of nodes planned for deletion in this invocation
|
||||
// * whether a budget overflow is still allowed.
|
||||
func cropAtomicNodes(groups []*NodeGroupView, budget int, canOverflow bool) ([]*NodeGroupView, int, bool) {
|
||||
toDelete := []*NodeGroupView{}
|
||||
remainingBudget := budget
|
||||
for _, bucket := range groups {
|
||||
if remainingBudget < len(bucket.Nodes) {
|
||||
// One pod slice can sneak in even if it would exceed parallelism budget.
|
||||
// This is to help avoid starvation of pod slices by regular nodes,
|
||||
// also larger pod slices will immediately exceed parallelism budget.
|
||||
if remainingBudget == 0 || (len(bucket.Nodes) > 0 && !canOverflow) {
|
||||
break
|
||||
}
|
||||
}
|
||||
toDelete = append(toDelete, bucket)
|
||||
remainingBudget -= len(bucket.Nodes)
|
||||
canOverflow = false
|
||||
}
|
||||
return toDelete, budget - remainingBudget, canOverflow
|
||||
}
|
||||
|
||||
// cropIndividualNodes returns two values:
|
||||
// * nodes selected for deletion
|
||||
// * the number of nodes planned for deletion in this invocation
|
||||
func cropIndividualNodes(toDelete []*NodeGroupView, groups []*NodeGroupView, budget int) ([]*NodeGroupView, int) {
|
||||
remainingBudget := budget
|
||||
for _, bucket := range groups {
|
||||
if remainingBudget < 1 {
|
||||
break
|
||||
}
|
||||
if remainingBudget < len(bucket.Nodes) {
|
||||
bucket.Nodes = bucket.Nodes[:remainingBudget]
|
||||
}
|
||||
toDelete = append(toDelete, bucket)
|
||||
remainingBudget -= len(bucket.Nodes)
|
||||
}
|
||||
return toDelete, budget - remainingBudget
|
||||
}
|
||||
|
||||
func (bp *ScaleDownBudgetProcessor) group(nodes []*apiv1.Node) []*NodeGroupView {
|
||||
groupMap := map[cloudprovider.NodeGroup]int{}
|
||||
grouped := []*NodeGroupView{}
|
||||
for _, node := range nodes {
|
||||
nodeGroup, err := bp.ctx.CloudProvider.NodeGroupForNode(node)
|
||||
if err != nil || nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
|
||||
klog.Errorf("Failed to find node group for %s: %v", node.Name, err)
|
||||
continue
|
||||
}
|
||||
if idx, ok := groupMap[nodeGroup]; ok {
|
||||
grouped[idx].Nodes = append(grouped[idx].Nodes, node)
|
||||
} else {
|
||||
groupMap[nodeGroup] = len(grouped)
|
||||
grouped = append(grouped, &NodeGroupView{
|
||||
Group: nodeGroup,
|
||||
Nodes: []*apiv1.Node{node},
|
||||
})
|
||||
}
|
||||
}
|
||||
return grouped
|
||||
}
|
||||
|
||||
func (bp *ScaleDownBudgetProcessor) categorize(groups []*NodeGroupView) (individual, atomic []*NodeGroupView) {
|
||||
for _, view := range groups {
|
||||
autoscalingOptions, err := view.Group.GetOptions(bp.ctx.NodeGroupDefaults)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get autoscaling options for node group %s: %v", view.Group.Id(), err)
|
||||
continue
|
||||
}
|
||||
if autoscalingOptions != nil && autoscalingOptions.AtomicScaleDown {
|
||||
atomic = append(atomic, view)
|
||||
} else {
|
||||
individual = append(individual, view)
|
||||
}
|
||||
}
|
||||
return individual, atomic
|
||||
}
|
||||
|
||||
func (bp *ScaleDownBudgetProcessor) groupByNodeGroup(nodes []*apiv1.Node) (individual, atomic []*NodeGroupView) {
|
||||
individualGroup, atomicGroup := map[cloudprovider.NodeGroup]int{}, map[cloudprovider.NodeGroup]int{}
|
||||
individual, atomic = []*NodeGroupView{}, []*NodeGroupView{}
|
||||
for _, node := range nodes {
|
||||
nodeGroup, err := bp.ctx.CloudProvider.NodeGroupForNode(node)
|
||||
if err != nil || nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
|
||||
klog.Errorf("Failed to find node group for %s: %v", node.Name, err)
|
||||
continue
|
||||
}
|
||||
autoscalingOptions, err := nodeGroup.GetOptions(bp.ctx.NodeGroupDefaults)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err)
|
||||
continue
|
||||
}
|
||||
if autoscalingOptions != nil && autoscalingOptions.AtomicScaleDown {
|
||||
if idx, ok := atomicGroup[nodeGroup]; ok {
|
||||
atomic[idx].Nodes = append(atomic[idx].Nodes, node)
|
||||
} else {
|
||||
atomicGroup[nodeGroup] = len(atomic)
|
||||
atomic = append(atomic, &NodeGroupView{
|
||||
Group: nodeGroup,
|
||||
Nodes: []*apiv1.Node{node},
|
||||
})
|
||||
}
|
||||
} else {
|
||||
if idx, ok := individualGroup[nodeGroup]; ok {
|
||||
individual[idx].Nodes = append(individual[idx].Nodes, node)
|
||||
} else {
|
||||
individualGroup[nodeGroup] = len(individual)
|
||||
individual = append(individual, &NodeGroupView{
|
||||
Group: nodeGroup,
|
||||
Nodes: []*apiv1.Node{node},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return individual, atomic
|
||||
}
|
||||
|
||||
func min(x, y int) int {
|
||||
if x <= y {
|
||||
return x
|
||||
}
|
||||
return y
|
||||
}
|
||||
|
|
@ -0,0 +1,400 @@
|
|||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package budgets
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
|
||||
)
|
||||
|
||||
func TestCropNodesToBudgets(t *testing.T) {
|
||||
testNg := testprovider.NewTestNodeGroup("test-ng", 0, 100, 3, true, false, "n1-standard-2", nil, nil)
|
||||
testNg2 := testprovider.NewTestNodeGroup("test-ng2", 0, 100, 3, true, false, "n1-standard-2", nil, nil)
|
||||
atomic3 := sizedNodeGroup("atomic-3", 3, true)
|
||||
atomic4 := sizedNodeGroup("atomic-4", 4, true)
|
||||
atomic8 := sizedNodeGroup("atomic-8", 8, true)
|
||||
atomic11 := sizedNodeGroup("atomic-11", 11, true)
|
||||
for tn, tc := range map[string]struct {
|
||||
emptyDeletionsInProgress int
|
||||
drainDeletionsInProgress int
|
||||
empty []*NodeGroupView
|
||||
drain []*NodeGroupView
|
||||
wantEmpty []*NodeGroupView
|
||||
wantDrain []*NodeGroupView
|
||||
}{
|
||||
"no nodes": {
|
||||
empty: []*NodeGroupView{},
|
||||
drain: []*NodeGroupView{},
|
||||
wantEmpty: []*NodeGroupView{},
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
// Empty nodes only.
|
||||
"empty nodes within max limit, no deletions in progress": {
|
||||
empty: generateNodeGroupViewList(testNg, 0, 10),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 10),
|
||||
},
|
||||
"empty nodes exceeding max limit, no deletions in progress": {
|
||||
empty: generateNodeGroupViewList(testNg, 0, 11),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 10),
|
||||
},
|
||||
"empty atomic node group exceeding max limit": {
|
||||
empty: generateNodeGroupViewList(atomic11, 0, 11),
|
||||
wantEmpty: generateNodeGroupViewList(atomic11, 0, 11),
|
||||
},
|
||||
"empty regular and atomic": {
|
||||
empty: append(generateNodeGroupViewList(testNg, 0, 8), generateNodeGroupViewList(atomic3, 0, 3)...),
|
||||
wantEmpty: append(generateNodeGroupViewList(atomic3, 0, 3), generateNodeGroupViewList(testNg, 0, 7)...),
|
||||
},
|
||||
"multiple empty atomic": {
|
||||
empty: append(
|
||||
append(
|
||||
generateNodeGroupViewList(testNg, 0, 3),
|
||||
generateNodeGroupViewList(atomic8, 0, 8)...),
|
||||
generateNodeGroupViewList(atomic3, 0, 3)...),
|
||||
wantEmpty: append(generateNodeGroupViewList(atomic8, 0, 8), generateNodeGroupViewList(testNg, 0, 2)...),
|
||||
},
|
||||
"empty nodes with deletions in progress, within budget": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 1,
|
||||
empty: generateNodeGroupViewList(testNg, 0, 8),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 8),
|
||||
},
|
||||
"empty nodes with deletions in progress, exceeding budget": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 1,
|
||||
empty: generateNodeGroupViewList(testNg, 0, 10),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 8),
|
||||
},
|
||||
"empty atomic nodes with deletions in progress, exceeding budget": {
|
||||
emptyDeletionsInProgress: 3,
|
||||
drainDeletionsInProgress: 3,
|
||||
empty: generateNodeGroupViewList(atomic8, 0, 8),
|
||||
wantEmpty: generateNodeGroupViewList(atomic8, 0, 8),
|
||||
},
|
||||
"empty nodes with deletions in progress, 0 budget left": {
|
||||
emptyDeletionsInProgress: 5,
|
||||
drainDeletionsInProgress: 5,
|
||||
empty: generateNodeGroupViewList(testNg, 0, 10),
|
||||
wantEmpty: []*NodeGroupView{},
|
||||
},
|
||||
"empty atomic nodes with deletions in progress, 0 budget left": {
|
||||
emptyDeletionsInProgress: 5,
|
||||
drainDeletionsInProgress: 5,
|
||||
empty: generateNodeGroupViewList(atomic3, 0, 3),
|
||||
wantEmpty: []*NodeGroupView{},
|
||||
},
|
||||
"empty nodes with deletions in progress, budget exceeded": {
|
||||
emptyDeletionsInProgress: 50,
|
||||
drainDeletionsInProgress: 50,
|
||||
empty: generateNodeGroupViewList(testNg, 0, 10),
|
||||
wantEmpty: []*NodeGroupView{},
|
||||
},
|
||||
// Drain nodes only.
|
||||
"drain nodes within max limit, no deletions in progress": {
|
||||
drain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantDrain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
},
|
||||
"multiple drain node groups": {
|
||||
drain: append(generateNodeGroupViewList(testNg, 0, 5), generateNodeGroupViewList(testNg2, 0, 5)...),
|
||||
wantDrain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
},
|
||||
"drain nodes exceeding max limit, no deletions in progress": {
|
||||
drain: generateNodeGroupViewList(testNg, 0, 6),
|
||||
wantDrain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
},
|
||||
"drain atomic exceeding limit": {
|
||||
drain: generateNodeGroupViewList(atomic8, 0, 8),
|
||||
wantDrain: generateNodeGroupViewList(atomic8, 0, 8),
|
||||
},
|
||||
"drain regular and atomic exceeding limit": {
|
||||
drain: append(generateNodeGroupViewList(testNg, 0, 3), generateNodeGroupViewList(atomic3, 0, 3)...),
|
||||
wantDrain: append(generateNodeGroupViewList(atomic3, 0, 3), generateNodeGroupViewList(testNg, 0, 2)...),
|
||||
},
|
||||
"multiple drain atomic": {
|
||||
drain: append(
|
||||
append(
|
||||
generateNodeGroupViewList(testNg, 0, 3),
|
||||
generateNodeGroupViewList(atomic3, 0, 3)...),
|
||||
generateNodeGroupViewList(atomic4, 0, 4)...),
|
||||
wantDrain: append(generateNodeGroupViewList(atomic3, 0, 3), generateNodeGroupViewList(testNg, 0, 2)...),
|
||||
},
|
||||
"drain nodes with deletions in progress, within budget": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 2,
|
||||
drain: generateNodeGroupViewList(testNg, 0, 3),
|
||||
wantDrain: generateNodeGroupViewList(testNg, 0, 3),
|
||||
},
|
||||
"drain nodes with deletions in progress, exceeding drain budget": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 2,
|
||||
drain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantDrain: generateNodeGroupViewList(testNg, 0, 3),
|
||||
},
|
||||
"drain atomic nodes with deletions in progress, exceeding drain budget": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 2,
|
||||
drain: generateNodeGroupViewList(atomic4, 0, 4),
|
||||
wantDrain: generateNodeGroupViewList(atomic4, 0, 4),
|
||||
},
|
||||
"drain nodes with deletions in progress, 0 drain budget left": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 5,
|
||||
drain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
"drain atomic nodes with deletions in progress, 0 drain budget left": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 5,
|
||||
drain: generateNodeGroupViewList(atomic4, 0, 4),
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
"drain nodes with deletions in progress, drain budget exceeded": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 50,
|
||||
drain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
"drain nodes with deletions in progress, exceeding overall budget": {
|
||||
emptyDeletionsInProgress: 7,
|
||||
drainDeletionsInProgress: 1,
|
||||
drain: generateNodeGroupViewList(testNg, 0, 4),
|
||||
wantDrain: generateNodeGroupViewList(testNg, 0, 2),
|
||||
},
|
||||
"drain nodes with deletions in progress, 0 overall budget left": {
|
||||
emptyDeletionsInProgress: 10,
|
||||
drain: generateNodeGroupViewList(testNg, 0, 4),
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
"drain nodes with deletions in progress, overall budget exceeded": {
|
||||
emptyDeletionsInProgress: 50,
|
||||
drain: generateNodeGroupViewList(testNg, 0, 4),
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
// Empty and drain nodes together.
|
||||
"empty&drain nodes within max limits, no deletions in progress": {
|
||||
empty: generateNodeGroupViewList(testNg, 0, 5),
|
||||
drain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantDrain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 5),
|
||||
},
|
||||
"empty&drain atomic nodes within max limits, no deletions in progress": {
|
||||
empty: generateNodeGroupViewList(atomic3, 0, 3),
|
||||
drain: generateNodeGroupViewList(atomic4, 0, 4),
|
||||
wantEmpty: generateNodeGroupViewList(atomic3, 0, 3),
|
||||
wantDrain: generateNodeGroupViewList(atomic4, 0, 4),
|
||||
},
|
||||
"empty&drain nodes exceeding overall limit, no deletions in progress": {
|
||||
empty: generateNodeGroupViewList(testNg, 0, 8),
|
||||
drain: generateNodeGroupViewList(testNg, 0, 8),
|
||||
wantDrain: generateNodeGroupViewList(testNg, 0, 2),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 8),
|
||||
},
|
||||
"empty&drain atomic nodes exceeding overall limit, no deletions in progress": {
|
||||
empty: generateNodeGroupViewList(atomic8, 0, 8),
|
||||
drain: generateNodeGroupViewList(atomic4, 0, 4),
|
||||
wantEmpty: generateNodeGroupViewList(atomic8, 0, 8),
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
"empty&drain atomic nodes exceeding drain limit, no deletions in progress": {
|
||||
empty: generateNodeGroupViewList(atomic4, 0, 4),
|
||||
drain: generateNodeGroupViewList(atomic8, 0, 8),
|
||||
wantEmpty: generateNodeGroupViewList(atomic4, 0, 4),
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
"empty&drain atomic and regular nodes exceeding drain limit, no deletions in progress": {
|
||||
empty: append(generateNodeGroupViewList(testNg, 0, 5), generateNodeGroupViewList(atomic3, 0, 3)...),
|
||||
drain: generateNodeGroupViewList(atomic8, 0, 8),
|
||||
wantEmpty: append(generateNodeGroupViewList(atomic3, 0, 3), generateNodeGroupViewList(testNg, 0, 5)...),
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
"empty regular and drain atomic nodes exceeding overall limit, no deletions in progress": {
|
||||
drain: generateNodeGroupViewList(atomic8, 0, 8),
|
||||
empty: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantDrain: generateNodeGroupViewList(atomic8, 0, 8),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 2),
|
||||
},
|
||||
"empty&drain nodes exceeding drain limit, no deletions in progress": {
|
||||
empty: generateNodeGroupViewList(testNg, 0, 2),
|
||||
drain: generateNodeGroupViewList(testNg, 0, 8),
|
||||
wantDrain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 2),
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, 0 overall budget left": {
|
||||
emptyDeletionsInProgress: 10,
|
||||
empty: generateNodeGroupViewList(testNg, 0, 5),
|
||||
drain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantEmpty: []*NodeGroupView{},
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, overall budget exceeded (shouldn't happen, just a sanity check)": {
|
||||
emptyDeletionsInProgress: 50,
|
||||
empty: generateNodeGroupViewList(testNg, 0, 5),
|
||||
drain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantEmpty: []*NodeGroupView{},
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, 0 drain budget left": {
|
||||
drainDeletionsInProgress: 5,
|
||||
empty: generateNodeGroupViewList(testNg, 0, 5),
|
||||
drain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, drain budget exceeded (shouldn't happen, just a sanity check)": {
|
||||
drainDeletionsInProgress: 9,
|
||||
empty: generateNodeGroupViewList(testNg, 0, 5),
|
||||
drain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 1),
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, overall budget exceeded, only empty nodes fit": {
|
||||
emptyDeletionsInProgress: 5,
|
||||
drainDeletionsInProgress: 3,
|
||||
empty: generateNodeGroupViewList(testNg, 0, 5),
|
||||
drain: generateNodeGroupViewList(testNg, 0, 2),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 2),
|
||||
wantDrain: []*NodeGroupView{},
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, overall budget exceeded, both empty&drain nodes fit": {
|
||||
emptyDeletionsInProgress: 5,
|
||||
drainDeletionsInProgress: 3,
|
||||
empty: generateNodeGroupViewList(testNg, 0, 1),
|
||||
drain: generateNodeGroupViewList(testNg, 0, 2),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 1),
|
||||
wantDrain: generateNodeGroupViewList(testNg, 0, 1),
|
||||
},
|
||||
"empty&drain nodes with deletions in progress, drain budget exceeded": {
|
||||
emptyDeletionsInProgress: 1,
|
||||
drainDeletionsInProgress: 3,
|
||||
empty: generateNodeGroupViewList(testNg, 0, 4),
|
||||
drain: generateNodeGroupViewList(testNg, 0, 5),
|
||||
wantEmpty: generateNodeGroupViewList(testNg, 0, 4),
|
||||
wantDrain: generateNodeGroupViewList(testNg, 0, 2),
|
||||
},
|
||||
} {
|
||||
t.Run(tn, func(t *testing.T) {
|
||||
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
|
||||
return nil
|
||||
})
|
||||
for _, bucket := range append(tc.empty, tc.drain...) {
|
||||
bucket.Group.(*testprovider.TestNodeGroup).SetCloudProvider(provider)
|
||||
provider.InsertNodeGroup(bucket.Group)
|
||||
for _, node := range bucket.Nodes {
|
||||
provider.AddNode(bucket.Group.Id(), node)
|
||||
}
|
||||
}
|
||||
|
||||
ctx := &context.AutoscalingContext{
|
||||
AutoscalingOptions: config.AutoscalingOptions{
|
||||
MaxScaleDownParallelism: 10,
|
||||
MaxDrainParallelism: 5,
|
||||
NodeDeletionBatcherInterval: 0 * time.Second,
|
||||
NodeDeleteDelayAfterTaint: 1 * time.Second,
|
||||
},
|
||||
CloudProvider: provider,
|
||||
}
|
||||
ndt := deletiontracker.NewNodeDeletionTracker(1 * time.Hour)
|
||||
for i := 0; i < tc.emptyDeletionsInProgress; i++ {
|
||||
ndt.StartDeletion("ng1", fmt.Sprintf("empty-node-%d", i))
|
||||
}
|
||||
for i := 0; i < tc.drainDeletionsInProgress; i++ {
|
||||
ndt.StartDeletionWithDrain("ng2", fmt.Sprintf("drain-node-%d", i))
|
||||
}
|
||||
emptyList, drainList := []*apiv1.Node{}, []*apiv1.Node{}
|
||||
for _, bucket := range tc.empty {
|
||||
emptyList = append(emptyList, bucket.Nodes...)
|
||||
}
|
||||
for _, bucket := range tc.drain {
|
||||
drainList = append(drainList, bucket.Nodes...)
|
||||
}
|
||||
|
||||
budgeter := NewScaleDownBudgetProcessor(ctx, ndt)
|
||||
gotEmpty, gotDrain := budgeter.CropNodes(emptyList, drainList)
|
||||
if diff := cmp.Diff(tc.wantEmpty, gotEmpty, cmpopts.EquateEmpty(), transformNodeGroupView); diff != "" {
|
||||
t.Errorf("cropNodesToBudgets empty nodes diff (-want +got):\n%s", diff)
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantDrain, gotDrain, cmpopts.EquateEmpty(), transformNodeGroupView); diff != "" {
|
||||
t.Errorf("cropNodesToBudgets drain nodes diff (-want +got):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// transformNodeGroupView transforms a NodeGroupView to a structure that can be directly compared with other node bucket.
|
||||
var transformNodeGroupView = cmp.Transformer("transformNodeGroupView", func(b NodeGroupView) interface{} {
|
||||
return struct {
|
||||
Group string
|
||||
Nodes []*apiv1.Node
|
||||
}{
|
||||
Group: b.Group.Id(),
|
||||
Nodes: b.Nodes,
|
||||
}
|
||||
})
|
||||
|
||||
func sizedNodeGroup(id string, size int, atomic bool) cloudprovider.NodeGroup {
|
||||
ng := testprovider.NewTestNodeGroup(id, 10000, 0, size, true, false, "n1-standard-2", nil, nil)
|
||||
ng.SetOptions(&config.NodeGroupAutoscalingOptions{
|
||||
AtomicScaleDown: atomic,
|
||||
})
|
||||
return ng
|
||||
}
|
||||
|
||||
func generateNodes(from, to int, prefix string) []*apiv1.Node {
|
||||
var result []*apiv1.Node
|
||||
for i := from; i < to; i++ {
|
||||
name := fmt.Sprintf("node-%d", i)
|
||||
if prefix != "" {
|
||||
name = prefix + "-" + name
|
||||
}
|
||||
result = append(result, generateNode(name))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func generateNodeGroupViewList(ng cloudprovider.NodeGroup, from, to int) []*NodeGroupView {
|
||||
return []*NodeGroupView{
|
||||
{
|
||||
Group: ng,
|
||||
Nodes: generateNodes(from, to, ng.Id()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func generateNode(name string) *apiv1.Node {
|
||||
return &apiv1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: name},
|
||||
Status: apiv1.NodeStatus{
|
||||
Allocatable: apiv1.ResourceList{
|
||||
apiv1.ResourceCPU: resource.MustParse("8"),
|
||||
apiv1.ResourceMemory: resource.MustParse("8G"),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue