Merge pull request #7307 from abdelrahman882/total_min_size

Modify scale down set processor to add reasons to unremovable nodes
This commit is contained in:
Kubernetes Prow Robot 2024-10-29 10:58:57 +00:00 committed by GitHub
commit a6a77b3ec3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 465 additions and 148 deletions

View File

@ -18,7 +18,6 @@ package planner
import (
"fmt"
"math"
"time"
apiv1 "k8s.io/api/core/v1"
@ -73,10 +72,10 @@ type Planner struct {
minUpdateInterval time.Duration
eligibilityChecker eligibilityChecker
nodeUtilizationMap map[string]utilization.Info
actuationStatus scaledown.ActuationStatus
resourceLimitsFinder *resource.LimitsFinder
cc controllerReplicasCalculator
scaleDownSetProcessor nodes.ScaleDownSetProcessor
scaleDownContext *nodes.ScaleDownContext
}
// New creates a new Planner object.
@ -97,6 +96,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
resourceLimitsFinder: resourceLimitsFinder,
cc: newControllerReplicasCalculator(context.ListerRegistry),
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
scaleDownContext: nodes.NewDefaultScaleDownContext(),
minUpdateInterval: minUpdateInterval,
}
}
@ -110,7 +110,7 @@ func (p *Planner) UpdateClusterState(podDestinations, scaleDownCandidates []*api
p.minUpdateInterval = updateInterval
}
p.latestUpdate = currentTime
p.actuationStatus = as
p.scaleDownContext.ActuationStatus = as
// Avoid persisting changes done by the simulation.
p.context.ClusterSnapshot.Fork()
defer p.context.ClusterSnapshot.Revert()
@ -147,22 +147,17 @@ func (p *Planner) NodesToDelete(_ time.Time) (empty, needDrain []*apiv1.Node) {
klog.Errorf("Nothing will scale down, failed to create resource limiter: %v", err)
return nil, nil
}
limitsLeft := p.resourceLimitsFinder.LimitsLeft(p.context, nodes, resourceLimiter, p.latestUpdate)
emptyRemovable, needDrainRemovable, unremovable := p.unneededNodes.RemovableAt(p.context, p.latestUpdate, limitsLeft, resourceLimiter.GetResources(), p.actuationStatus)
for _, u := range unremovable {
p.unremovableNodes.Add(u)
}
needDrainRemovable = sortByRisk(needDrainRemovable)
nodesToRemove := p.scaleDownSetProcessor.GetNodesToRemove(
p.context,
// We need to pass empty nodes first, as there might be some non-empty scale
// downs already in progress. If we pass the empty nodes first, they will be first
// to get deleted, thus we decrease chances of hitting the limit on non-empty scale down.
append(emptyRemovable, needDrainRemovable...),
// No need to limit the number of nodes, since it will happen later, in the actuation stage.
// It will make a more appropriate decision by using additional information about deletions
// in progress.
math.MaxInt)
p.scaleDownContext.ResourcesLeft = p.resourceLimitsFinder.LimitsLeft(p.context, nodes, resourceLimiter, p.latestUpdate).DeepCopy()
p.scaleDownContext.ResourcesWithLimits = resourceLimiter.GetResources()
emptyRemovableNodes, needDrainRemovableNodes, unremovableNodes := p.unneededNodes.RemovableAt(p.context, *p.scaleDownContext, p.latestUpdate)
p.addUnremovableNodes(unremovableNodes)
needDrainRemovableNodes = sortByRisk(needDrainRemovableNodes)
candidatesToBeRemoved := append(emptyRemovableNodes, needDrainRemovableNodes...)
nodesToRemove, unremovableNodes := p.scaleDownSetProcessor.FilterUnremovableNodes(p.context, p.scaleDownContext, candidatesToBeRemoved)
p.addUnremovableNodes(unremovableNodes)
for _, nodeToRemove := range nodesToRemove {
if len(nodeToRemove.PodsToReschedule) > 0 {
needDrain = append(needDrain, nodeToRemove.Node)
@ -174,6 +169,12 @@ func (p *Planner) NodesToDelete(_ time.Time) (empty, needDrain []*apiv1.Node) {
return empty, needDrain
}
func (p *Planner) addUnremovableNodes(unremovableNodes []simulator.UnremovableNode) {
for _, u := range unremovableNodes {
p.unremovableNodes.Add(&u)
}
}
func allNodes(s clustersnapshot.ClusterSnapshot) ([]*apiv1.Node, error) {
nodeInfos, err := s.NodeInfos().List()
if err != nil {
@ -212,7 +213,7 @@ func (p *Planner) NodeUtilizationMap() map[string]utilization.Info {
// For pods that are controlled by controller known by CA, it will check whether
// they have been recreated and will inject only not yet recreated pods.
func (p *Planner) injectRecentlyEvictedPods() error {
recentlyEvictedRecreatablePods := pod_util.FilterRecreatablePods(p.actuationStatus.RecentEvictions())
recentlyEvictedRecreatablePods := pod_util.FilterRecreatablePods(p.scaleDownContext.ActuationStatus.RecentEvictions())
return p.injectPods(filterOutRecreatedPods(recentlyEvictedRecreatablePods, p.cc))
}

View File

@ -35,6 +35,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
@ -501,7 +502,7 @@ func TestUpdateClusterState(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
if tc.isSimulationTimeout {
context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
@ -697,7 +698,7 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))}
p.minUpdateInterval = tc.updateInterval
p.unneededNodes.Update(previouslyUnneeded, time.Now())
@ -865,9 +866,9 @@ func TestNodesToDelete(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p.latestUpdate = time.Now()
p.actuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
p.scaleDownContext.ActuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour))
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(allNodes))}
empty, drain := p.NodesToDelete(time.Now())

View File

@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@ -117,23 +118,22 @@ func (n *Nodes) Drop(node string) {
// RemovableAt returns all nodes that can be removed at a given time, divided
// into empty and non-empty node lists, as well as a list of nodes that were
// unneeded, but are not removable, annotated by reason.
func (n *Nodes) RemovableAt(context *context.AutoscalingContext, ts time.Time, resourcesLeft resource.Limits, resourcesWithLimits []string, as scaledown.ActuationStatus) (empty, needDrain []simulator.NodeToBeRemoved, unremovable []*simulator.UnremovableNode) {
func (n *Nodes) RemovableAt(context *context.AutoscalingContext, scaleDownContext nodes.ScaleDownContext, ts time.Time) (empty, needDrain []simulator.NodeToBeRemoved, unremovable []simulator.UnremovableNode) {
nodeGroupSize := utils.GetNodeGroupSizeMap(context.CloudProvider)
resourcesLeftCopy := resourcesLeft.DeepCopy()
emptyNodes, drainNodes := n.splitEmptyAndNonEmptyNodes()
for nodeName, v := range emptyNodes {
klog.V(2).Infof("%s was unneeded for %s", nodeName, ts.Sub(v.since).String())
if r := n.unremovableReason(context, v, ts, nodeGroupSize, resourcesLeftCopy, resourcesWithLimits, as); r != simulator.NoReason {
unremovable = append(unremovable, &simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r})
if r := n.unremovableReason(context, scaleDownContext, v, ts, nodeGroupSize); r != simulator.NoReason {
unremovable = append(unremovable, simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r})
continue
}
empty = append(empty, v.ntbr)
}
for nodeName, v := range drainNodes {
klog.V(2).Infof("%s was unneeded for %s", nodeName, ts.Sub(v.since).String())
if r := n.unremovableReason(context, v, ts, nodeGroupSize, resourcesLeftCopy, resourcesWithLimits, as); r != simulator.NoReason {
unremovable = append(unremovable, &simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r})
if r := n.unremovableReason(context, scaleDownContext, v, ts, nodeGroupSize); r != simulator.NoReason {
unremovable = append(unremovable, simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r})
continue
}
needDrain = append(needDrain, v.ntbr)
@ -141,7 +141,7 @@ func (n *Nodes) RemovableAt(context *context.AutoscalingContext, ts time.Time, r
return
}
func (n *Nodes) unremovableReason(context *context.AutoscalingContext, v *node, ts time.Time, nodeGroupSize map[string]int, resourcesLeft resource.Limits, resourcesWithLimits []string, as scaledown.ActuationStatus) simulator.UnremovableReason {
func (n *Nodes) unremovableReason(context *context.AutoscalingContext, scaleDownContext nodes.ScaleDownContext, v *node, ts time.Time, nodeGroupSize map[string]int) simulator.UnremovableReason {
node := v.ntbr.Node
// Check if node is marked with no scale down annotation.
if eligibility.HasNoScaleDownAnnotation(node) {
@ -182,17 +182,17 @@ func (n *Nodes) unremovableReason(context *context.AutoscalingContext, v *node,
}
}
if reason := verifyMinSize(node.Name, nodeGroup, nodeGroupSize, as); reason != simulator.NoReason {
if reason := verifyMinSize(node.Name, nodeGroup, nodeGroupSize, scaleDownContext.ActuationStatus); reason != simulator.NoReason {
return reason
}
resourceDelta, err := n.limitsFinder.DeltaForNode(context, node, nodeGroup, resourcesWithLimits)
resourceDelta, err := n.limitsFinder.DeltaForNode(context, node, nodeGroup, scaleDownContext.ResourcesWithLimits)
if err != nil {
klog.Errorf("Error getting node resources: %v", err)
return simulator.UnexpectedError
}
checkResult := resourcesLeft.TryDecrementBy(resourceDelta)
checkResult := scaleDownContext.ResourcesLeft.TryDecrementBy(resourceDelta)
if checkResult.Exceeded() {
klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.ExceededResources)
for _, resource := range checkResult.ExceededResources {

View File

@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -188,10 +189,10 @@ func TestRemovableAt(t *testing.T) {
})
}
nodes := append(empty, drain...)
removableNodes := append(empty, drain...)
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.InsertNodeGroup(ng)
for _, node := range nodes {
for _, node := range removableNodes {
provider.AddNode("ng", node.Node)
}
@ -204,8 +205,12 @@ func TestRemovableAt(t *testing.T) {
assert.NoError(t, err)
n := NewNodes(&fakeScaleDownTimeGetter{}, &resource.LimitsFinder{})
n.Update(nodes, time.Now())
gotEmptyToRemove, gotDrainToRemove, _ := n.RemovableAt(&ctx, time.Now(), resource.Limits{}, []string{}, as)
n.Update(removableNodes, time.Now())
gotEmptyToRemove, gotDrainToRemove, _ := n.RemovableAt(&ctx, nodes.ScaleDownContext{
ActuationStatus: as,
ResourcesLeft: resource.Limits{},
ResourcesWithLimits: []string{},
}, time.Now())
if len(gotDrainToRemove) != tc.numDrainToRemove || len(gotEmptyToRemove) != tc.numEmptyToRemove {
t.Errorf("%s: getNodesToRemove() return %d, %d, want %d, %d", tc.name, len(gotEmptyToRemove), len(gotDrainToRemove), tc.numEmptyToRemove, tc.numDrainToRemove)
}

View File

@ -46,6 +46,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
@ -1055,7 +1056,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
assert.NoError(t, err)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
processors.ScaleStateNotifier.Register(clusterState)
if config.EnableAutoprovisioning {
processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t}
@ -1163,7 +1164,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
@ -1209,7 +1210,7 @@ func TestBinpackingLimiter(t *testing.T) {
extraPod := BuildTestPod("p-new", 500, 0)
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
// We should stop binpacking after finding expansion option from first node group.
processors.BinpackingLimiter = &MockBinpackingLimiter{}
@ -1263,7 +1264,7 @@ func TestScaleUpNoHelp(t *testing.T) {
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
@ -1485,7 +1486,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0))
}
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
@ -1541,7 +1542,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t}
processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 0}
@ -1596,7 +1597,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t}
processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 2}
@ -1654,7 +1655,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
@ -1746,7 +1747,7 @@ func TestScaleupAsyncNodeGroupsEnabled(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t}
processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 0}
processors.AsyncNodeGroupStateChecker = &asyncnodegroups.MockAsyncNodeGroupStateChecker{IsUpcomingNodeGroup: tc.isUpcomingMockMap}

View File

@ -30,6 +30,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
utils_test "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -68,7 +69,7 @@ func TestDeltaForNode(t *testing.T) {
for _, testCase := range testCases {
cp := testprovider.NewTestCloudProvider(nil, nil)
ctx := newContext(t, cp)
processors := test.NewTestProcessors(&ctx)
processors := processorstest.NewTestProcessors(&ctx)
ng := testCase.nodeGroupConfig
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
@ -109,7 +110,7 @@ func TestResourcesLeft(t *testing.T) {
for _, testCase := range testCases {
cp := newCloudProvider(t, 1000, 1000)
ctx := newContext(t, cp)
processors := test.NewTestProcessors(&ctx)
processors := processorstest.NewTestProcessors(&ctx)
ng := testCase.nodeGroupConfig
_, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
@ -160,7 +161,7 @@ func TestApplyLimits(t *testing.T) {
for _, testCase := range testCases {
cp := testprovider.NewTestCloudProvider(nil, nil)
ctx := newContext(t, cp)
processors := test.NewTestProcessors(&ctx)
processors := processorstest.NewTestProcessors(&ctx)
ng := testCase.nodeGroupConfig
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
@ -218,7 +219,7 @@ func TestResourceManagerWithGpuResource(t *testing.T) {
provider.SetResourceLimiter(resourceLimiter)
context := newContext(t, provider)
processors := test.NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
n1 := newNode(t, "n1", 8, 16)
utils_test.AddGpusToNode(n1, 4)

View File

@ -49,6 +49,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
@ -164,7 +165,7 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error {
func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions config.AutoscalingOptions) {
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), NewTestProcessors(ctx).NodeGroupConfigProcessor)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor)
}
type nodeGroup struct {
@ -284,7 +285,7 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) {
ngConfigProcesssor := nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.autoscalingOptions.NodeGroupDefaults)
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, config.clusterStateConfig, context.LogRecorder, NewBackoff(), ngConfigProcesssor, processors.AsyncNodeGroupStateChecker)
@ -383,7 +384,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
}
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
suOrchestrator := orchestrator.New()
@ -651,7 +652,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) {
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
}
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
sddProcessor := scaledowncandidates.NewScaleDownCandidatesDelayProcessor()
processors.ScaleStateNotifier.Register(sddProcessor)
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
@ -797,7 +798,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
setUpScaleDownActuator(&context, options)
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
processors.NodeGroupManager = nodeGroupManager
processors.NodeGroupListProcessor = nodeGroupListProcessor
@ -950,7 +951,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
}
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
// broken node detected as unregistered
@ -1109,7 +1110,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
OkTotalUnreadyCount: 1,
}
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
suOrchestrator := orchestrator.New()
@ -1241,7 +1242,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T)
OkTotalUnreadyCount: 1,
}
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
@ -1340,7 +1341,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
OkTotalUnreadyCount: 1,
}
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
@ -1443,7 +1444,7 @@ func TestStaticAutoscalerRunOnceWithUnselectedNodeGroups(t *testing.T) {
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
}
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingOptions.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
@ -2088,14 +2089,14 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
ctx, err := NewScaleTestAutoscalingContext(autoscalingOptions, &fake.Clientset{}, listerRegistry, provider, processorCallbacks, nil)
assert.NoError(t, err)
processors := NewTestProcessors(&ctx)
processors := processorstest.NewTestProcessors(&ctx)
// Create CSR with unhealthy cluster protection effectively disabled, to guarantee we reach the tested logic.
csrConfig := clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: nodeGroupCount * unreadyNodesCount}
csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker)
// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, NewTestProcessors(&ctx).NodeGroupConfigProcessor)
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor)
ctx.ScaleDownActuator = actuator
// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.

View File

@ -27,30 +27,17 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster"
"k8s.io/autoscaler/cluster-autoscaler/processors/binpacking"
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@ -175,32 +162,6 @@ func ExtractPodNames(pods []*apiv1.Pod) []string {
return podNames
}
// NewTestProcessors returns a set of simple processors for use in tests.
func NewTestProcessors(context *context.AutoscalingContext) *processors.AutoscalingProcessors {
return &processors.AutoscalingProcessors{
PodListProcessor: podlistprocessor.NewDefaultPodListProcessor(context.PredicateChecker, scheduling.ScheduleAnywhere),
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{},
BinpackingLimiter: binpacking.NewTimeLimiter(context.MaxNodeGroupBinpackingDuration),
NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}),
ScaleDownSetProcessor: nodes.NewCompositeScaleDownSetProcessor([]nodes.ScaleDownSetProcessor{
nodes.NewMaxNodesProcessor(),
nodes.NewAtomicResizeFilteringProcessor(),
}),
// TODO(bskiba): change scale up test so that this can be a NoOpProcessor
ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{},
ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{},
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(context.NodeGroupDefaults),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
AsyncNodeGroupStateChecker: asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(),
}
}
// NewScaleTestAutoscalingContext creates a new test autoscaling context for scaling tests.
func NewScaleTestAutoscalingContext(
options config.AutoscalingOptions, fakeClient kube_client.Interface,

View File

@ -0,0 +1,34 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodes
import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
)
// ScaleDownContext keeps an updated version actuationStatus and resourcesLeft for the scaling down process
type ScaleDownContext struct {
ActuationStatus scaledown.ActuationStatus
ResourcesLeft resource.Limits
ResourcesWithLimits []string
}
// NewDefaultScaleDownContext returns ScaleDownContext with passed MaxNodeCountToBeRemoved
func NewDefaultScaleDownContext() *ScaleDownContext {
return &ScaleDownContext{}
}

View File

@ -37,12 +37,24 @@ func NewCompositeScaleDownSetProcessor(orderedProcessorList []ScaleDownSetProces
}
}
// GetNodesToRemove selects nodes to remove.
func (p *CompositeScaleDownSetProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved {
for _, p := range p.orderedProcessorList {
candidates = p.GetNodesToRemove(ctx, candidates, maxCount)
// FilterUnremovableNodes filters the passed removable candidates from unremovable nodes by calling orderedProcessorList in order
func (p *CompositeScaleDownSetProcessor) FilterUnremovableNodes(ctx *context.AutoscalingContext, scaleDownCtx *ScaleDownContext, candidates []simulator.NodeToBeRemoved) ([]simulator.NodeToBeRemoved, []simulator.UnremovableNode) {
unremovableNodes := []simulator.UnremovableNode{}
nodesToBeRemoved := []simulator.NodeToBeRemoved{}
nodesToBeRemoved = append(nodesToBeRemoved, candidates...)
for indx, p := range p.orderedProcessorList {
processorRemovableNodes, processorUnremovableNodes := p.FilterUnremovableNodes(ctx, scaleDownCtx, nodesToBeRemoved)
if len(processorRemovableNodes)+len(processorUnremovableNodes) != len(candidates) {
klog.Errorf("Scale down set composite processor failed with processor at index %d: removable nodes (%d) + unremovable nodes (%d) != candidates nodes (%d)",
indx, len(processorRemovableNodes), len(processorUnremovableNodes), len(candidates))
}
nodesToBeRemoved = processorRemovableNodes
unremovableNodes = append(unremovableNodes, processorUnremovableNodes...)
}
return candidates
return nodesToBeRemoved, unremovableNodes
}
// CleanUp is called at CA termination
@ -52,28 +64,6 @@ func (p *CompositeScaleDownSetProcessor) CleanUp() {
}
}
// MaxNodesProcessor selects first maxCount nodes (if possible) to be removed
type MaxNodesProcessor struct {
}
// GetNodesToRemove selects up to maxCount nodes for deletion, by selecting a first maxCount candidates
func (p *MaxNodesProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved {
end := len(candidates)
if len(candidates) > maxCount {
end = maxCount
}
return candidates[:end]
}
// CleanUp is called at CA termination
func (p *MaxNodesProcessor) CleanUp() {
}
// NewMaxNodesProcessor returns a new MaxNodesProcessor
func NewMaxNodesProcessor() *MaxNodesProcessor {
return &MaxNodesProcessor{}
}
// AtomicResizeFilteringProcessor removes node groups which should be scaled down as one unit
// if only part of these nodes were scheduled for scale down.
// NOTE! When chaining with other processors, AtomicResizeFilteringProcessors should be always used last.
@ -82,21 +72,25 @@ func NewMaxNodesProcessor() *MaxNodesProcessor {
type AtomicResizeFilteringProcessor struct {
}
// GetNodesToRemove selects up to maxCount nodes for deletion, by selecting a first maxCount candidates
func (p *AtomicResizeFilteringProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved {
// FilterUnremovableNodes marks all candidate nodes as unremovable if ZeroOrMaxNodeScaling is enabled and number of nodes to remove are not equal to target size
func (p *AtomicResizeFilteringProcessor) FilterUnremovableNodes(ctx *context.AutoscalingContext, scaleDownCtx *ScaleDownContext, candidates []simulator.NodeToBeRemoved) ([]simulator.NodeToBeRemoved, []simulator.UnremovableNode) {
nodesToBeRemoved := []simulator.NodeToBeRemoved{}
unremovableNodes := []simulator.UnremovableNode{}
atomicQuota := klogx.NodesLoggingQuota()
standardQuota := klogx.NodesLoggingQuota()
nodesByGroup := map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved{}
result := []simulator.NodeToBeRemoved{}
for _, node := range candidates {
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node.Node)
if err != nil {
klog.Errorf("Node %v will not scale down, failed to get node info: %s", node.Node.Name, err)
unremovableNodes = append(unremovableNodes, simulator.UnremovableNode{Node: node.Node, Reason: simulator.UnexpectedError})
continue
}
autoscalingOptions, err := nodeGroup.GetOptions(ctx.NodeGroupDefaults)
if err != nil && err != cloudprovider.ErrNotImplemented {
klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err)
unremovableNodes = append(unremovableNodes, simulator.UnremovableNode{Node: node.Node, Reason: simulator.UnexpectedError})
continue
}
if autoscalingOptions != nil && autoscalingOptions.ZeroOrMaxNodeScaling {
@ -104,7 +98,7 @@ func (p *AtomicResizeFilteringProcessor) GetNodesToRemove(ctx *context.Autoscali
nodesByGroup[nodeGroup] = append(nodesByGroup[nodeGroup], node)
} else {
klogx.V(2).UpTo(standardQuota).Infof("Considering node %s for standard scale down", node.Node.Name)
result = append(result, node)
nodesToBeRemoved = append(nodesToBeRemoved, node)
}
}
klogx.V(2).Over(atomicQuota).Infof("Considering %d other nodes for atomic scale down", -atomicQuota.Left())
@ -113,16 +107,22 @@ func (p *AtomicResizeFilteringProcessor) GetNodesToRemove(ctx *context.Autoscali
ngSize, err := nodeGroup.TargetSize()
if err != nil {
klog.Errorf("Nodes from group %s will not scale down, failed to get target size: %s", nodeGroup.Id(), err)
for _, node := range nodes {
unremovableNodes = append(unremovableNodes, simulator.UnremovableNode{Node: node.Node, Reason: simulator.UnexpectedError})
}
continue
}
if ngSize == len(nodes) {
klog.V(2).Infof("Scheduling atomic scale down for all %v nodes from node group %s", len(nodes), nodeGroup.Id())
result = append(result, nodes...)
nodesToBeRemoved = append(nodesToBeRemoved, nodes...)
} else {
klog.V(2).Infof("Skipping scale down for %v nodes from node group %s, all %v nodes have to be scaled down atomically", len(nodes), nodeGroup.Id(), ngSize)
for _, node := range nodes {
unremovableNodes = append(unremovableNodes, simulator.UnremovableNode{Node: node.Node, Reason: simulator.AtomicScaleDownFailed})
}
}
}
return result
return nodesToBeRemoved, unremovableNodes
}
// CleanUp is called at CA termination

View File

@ -0,0 +1,251 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodes
import (
"testing"
"github.com/stretchr/testify/assert"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/config"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics"
)
func TestAtomicResizeFilterUnremovableNodes(t *testing.T) {
schedulermetrics.Register()
testCases := []struct {
name string
nodeGroups []struct {
nodeGroupName string
nodeGroupTargetSize int
zeroOrMaxNodeScaling bool
}
removableCandidates []struct {
candidate simulator.NodeToBeRemoved
nodeGroup string
}
scaleDownContext *ScaleDownContext
expectedToBeRemoved []simulator.NodeToBeRemoved
expectedUnremovable []simulator.UnremovableNode
}{
{
name: "Atomic removal",
nodeGroups: []struct {
nodeGroupName string
nodeGroupTargetSize int
zeroOrMaxNodeScaling bool
}{
{
nodeGroupName: "ng1",
nodeGroupTargetSize: 3,
zeroOrMaxNodeScaling: true,
},
{
nodeGroupName: "ng2",
nodeGroupTargetSize: 4,
zeroOrMaxNodeScaling: true,
},
},
removableCandidates: []struct {
candidate simulator.NodeToBeRemoved
nodeGroup string
}{
{
candidate: buildRemovableNode("ng1-node-1"),
nodeGroup: "ng1",
},
{
candidate: buildRemovableNode("ng1-node-2"),
nodeGroup: "ng1",
},
{
candidate: buildRemovableNode("ng1-node-3"),
nodeGroup: "ng1",
},
{
candidate: buildRemovableNode("ng2-node-1"),
nodeGroup: "ng2",
},
{
candidate: buildRemovableNode("ng2-node-2"),
nodeGroup: "ng2",
},
},
scaleDownContext: NewDefaultScaleDownContext(),
expectedToBeRemoved: []simulator.NodeToBeRemoved{
buildRemovableNode("ng1-node-1"),
buildRemovableNode("ng1-node-2"),
buildRemovableNode("ng1-node-3"),
},
expectedUnremovable: []simulator.UnremovableNode{
buildUnremovableNode("ng2-node-1", simulator.AtomicScaleDownFailed),
buildUnremovableNode("ng2-node-2", simulator.AtomicScaleDownFailed),
},
},
{
name: "Mixed Groups",
nodeGroups: []struct {
nodeGroupName string
nodeGroupTargetSize int
zeroOrMaxNodeScaling bool
}{
{
nodeGroupName: "ng1",
nodeGroupTargetSize: 3,
zeroOrMaxNodeScaling: false,
},
{
nodeGroupName: "ng2",
nodeGroupTargetSize: 4,
zeroOrMaxNodeScaling: true,
},
},
removableCandidates: []struct {
candidate simulator.NodeToBeRemoved
nodeGroup string
}{
{
candidate: buildRemovableNode("ng1-node-1"),
nodeGroup: "ng1",
},
{
candidate: buildRemovableNode("ng1-node-2"),
nodeGroup: "ng1",
},
{
candidate: buildRemovableNode("ng1-node-3"),
nodeGroup: "ng1",
},
{
candidate: buildRemovableNode("ng2-node-1"),
nodeGroup: "ng2",
},
{
candidate: buildRemovableNode("ng2-node-2"),
nodeGroup: "ng2",
},
},
scaleDownContext: NewDefaultScaleDownContext(),
expectedToBeRemoved: []simulator.NodeToBeRemoved{
buildRemovableNode("ng1-node-1"),
buildRemovableNode("ng1-node-2"),
buildRemovableNode("ng1-node-3"),
},
expectedUnremovable: []simulator.UnremovableNode{
buildUnremovableNode("ng2-node-1", simulator.AtomicScaleDownFailed),
buildUnremovableNode("ng2-node-2", simulator.AtomicScaleDownFailed),
},
},
{
name: "No atomic groups",
nodeGroups: []struct {
nodeGroupName string
nodeGroupTargetSize int
zeroOrMaxNodeScaling bool
}{
{
nodeGroupName: "ng1",
nodeGroupTargetSize: 3,
zeroOrMaxNodeScaling: false,
},
{
nodeGroupName: "ng2",
nodeGroupTargetSize: 4,
zeroOrMaxNodeScaling: false,
},
},
removableCandidates: []struct {
candidate simulator.NodeToBeRemoved
nodeGroup string
}{
{
candidate: buildRemovableNode("ng1-node-1"),
nodeGroup: "ng1",
},
{
candidate: buildRemovableNode("ng1-node-2"),
nodeGroup: "ng1",
},
{
candidate: buildRemovableNode("ng1-node-3"),
nodeGroup: "ng1",
},
{
candidate: buildRemovableNode("ng2-node-1"),
nodeGroup: "ng2",
},
{
candidate: buildRemovableNode("ng2-node-2"),
nodeGroup: "ng2",
},
},
scaleDownContext: NewDefaultScaleDownContext(),
expectedToBeRemoved: []simulator.NodeToBeRemoved{
buildRemovableNode("ng1-node-1"),
buildRemovableNode("ng1-node-2"),
buildRemovableNode("ng1-node-3"),
buildRemovableNode("ng2-node-1"),
buildRemovableNode("ng2-node-2"),
},
expectedUnremovable: []simulator.UnremovableNode{},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
processor := NewAtomicResizeFilteringProcessor()
provider := testprovider.NewTestCloudProvider(nil, nil)
for _, ng := range tc.nodeGroups {
provider.AddNodeGroupWithCustomOptions(ng.nodeGroupName, 0, 100, ng.nodeGroupTargetSize, &config.NodeGroupAutoscalingOptions{
ZeroOrMaxNodeScaling: ng.zeroOrMaxNodeScaling,
})
}
candidates := []simulator.NodeToBeRemoved{}
for _, node := range tc.removableCandidates {
provider.AddNode(node.nodeGroup, node.candidate.Node)
candidates = append(candidates, node.candidate)
}
context, _ := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{},
}, &fake.Clientset{}, nil, provider, nil, nil)
toBeRemoved, unRemovable := processor.FilterUnremovableNodes(&context, tc.scaleDownContext, candidates)
assert.ElementsMatch(t, tc.expectedToBeRemoved, toBeRemoved)
assert.ElementsMatch(t, tc.expectedUnremovable, unRemovable)
})
}
}
func buildRemovableNode(name string) simulator.NodeToBeRemoved {
return simulator.NodeToBeRemoved{
Node: BuildTestNode(name, 1000, 10),
}
}
func buildUnremovableNode(name string, reason simulator.UnremovableReason) simulator.UnremovableNode {
return simulator.UnremovableNode{
Node: BuildTestNode(name, 1000, 10),
Reason: reason,
}
}

View File

@ -18,7 +18,6 @@ package nodes
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
@ -37,8 +36,10 @@ type ScaleDownNodeProcessor interface {
// ScaleDownSetProcessor contains a method to select nodes for deletion
type ScaleDownSetProcessor interface {
// GetNodesToRemove selects up to maxCount nodes for deletion
GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved
// FilterUnremovableNodes divides all candidates into removable nodes and unremovable nodes with reason
// Note that len(removableNodes) + len(unremovableNode) should equal len(candidates)
// in other words, each candidate should end up in one and only one of the resulting node lists.
FilterUnremovableNodes(ctx *context.AutoscalingContext, scaleDownCtx *ScaleDownContext, candidates []simulator.NodeToBeRemoved) ([]simulator.NodeToBeRemoved, []simulator.UnremovableNode)
// CleanUp is called at CA termination
CleanUp()
}

View File

@ -87,14 +87,9 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors
MaxCapacityMemoryDifferenceRatio: config.DefaultMaxCapacityMemoryDifferenceRatio,
MaxFreeDifferenceRatio: config.DefaultMaxFreeDifferenceRatio,
}),
ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(),
ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(),
ScaleDownSetProcessor: nodes.NewCompositeScaleDownSetProcessor(
[]nodes.ScaleDownSetProcessor{
nodes.NewMaxNodesProcessor(),
nodes.NewAtomicResizeFilteringProcessor(),
},
),
ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(),
ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(),
ScaleDownSetProcessor: nodes.NewAtomicResizeFilteringProcessor(),
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),

View File

@ -0,0 +1,60 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package test
import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster"
"k8s.io/autoscaler/cluster-autoscaler/processors/binpacking"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
)
// NewTestProcessors returns a set of simple processors for use in tests.
func NewTestProcessors(context *context.AutoscalingContext) *processors.AutoscalingProcessors {
return &processors.AutoscalingProcessors{
PodListProcessor: podlistprocessor.NewDefaultPodListProcessor(context.PredicateChecker, scheduling.ScheduleAnywhere),
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{},
BinpackingLimiter: binpacking.NewTimeLimiter(context.MaxNodeGroupBinpackingDuration),
NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}),
ScaleDownSetProcessor: nodes.NewAtomicResizeFilteringProcessor(),
// TODO(bskiba): change scale up test so that this can be a NoOpProcessor
ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{},
ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{},
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(context.NodeGroupDefaults),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
AsyncNodeGroupStateChecker: asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(),
}
}

View File

@ -26,7 +26,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/config"
@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
@ -486,7 +487,7 @@ func setupTest(t *testing.T, client *provreqclient.ProvisioningRequestClient, no
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, nodes, nil)
processors := NewTestProcessors(&autoscalingContext)
processors := processorstest.NewTestProcessors(&autoscalingContext)
if autoprovisioning {
processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t}
processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 2}

View File

@ -73,6 +73,10 @@ const (
NotUnreadyLongEnough
// NodeGroupMinSizeReached - node can't be removed because its node group is at its minimal size already.
NodeGroupMinSizeReached
// NodeGroupMaxDeletionCountReached - node can't be removed because max node count to be removed value set in planner reached
NodeGroupMaxDeletionCountReached
// AtomicScaleDownFailed - node can't be removed as node group has ZeroOrMaxNodeScaling enabled and number of nodes to remove are not equal to target size
AtomicScaleDownFailed
// MinimalResourceLimitExceeded - node can't be removed because it would violate cluster-wide minimal resource limits.
MinimalResourceLimitExceeded
// CurrentlyBeingDeleted - node can't be removed because it's already in the process of being deleted.