Merge pull request #5672 from vadasambar/feat/5399/ignore-daemonsets-utilization-per-nodegroup

feat: set `IgnoreDaemonSetsUtilization` per nodegroup for AWS
This commit is contained in:
Kubernetes Prow Robot 2023-07-12 07:43:12 -07:00 committed by GitHub
commit c6893e9e28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 623 additions and 322 deletions

View File

@ -246,6 +246,8 @@ as string). Currently supported autoscaling options (and example values) are:
(overrides `--scale-down-unneeded-time` value for that specific ASG)
* `k8s.io/cluster-autoscaler/node-template/autoscaling-options/scaledownunreadytime`: `20m0s`
(overrides `--scale-down-unready-time` value for that specific ASG)
* `k8s.io/cluster-autoscaler/node-template/autoscaling-options/ignoredaemonsetsutilization`: `true`
(overrides `--ignore-daemonsets-utilization` value for that specific ASG)
**NOTE:** It is your responsibility to ensure such labels and/or taints are
applied via the node's kubelet configuration at startup. Cluster Autoscaler will not set the node taints for you.

View File

@ -245,6 +245,15 @@ func (m *AwsManager) GetAsgOptions(asg asg, defaults config.NodeGroupAutoscaling
}
}
if stringOpt, found := options[config.DefaultIgnoreDaemonSetsUtilizationKey]; found {
if opt, err := strconv.ParseBool(stringOpt); err != nil {
klog.Warningf("failed to convert asg %s %s tag to bool: %v",
asg.Name, config.DefaultIgnoreDaemonSetsUtilizationKey, err)
} else {
defaults.IgnoreDaemonSetsUtilization = opt
}
}
return &defaults
}

View File

@ -130,6 +130,7 @@ func TestGetAsgOptions(t *testing.T) {
ScaleDownGpuUtilizationThreshold: 0.2,
ScaleDownUnneededTime: time.Second,
ScaleDownUnreadyTime: time.Minute,
IgnoreDaemonSetsUtilization: false,
}
tests := []struct {
@ -145,39 +146,60 @@ func TestGetAsgOptions(t *testing.T) {
{
description: "keep defaults on invalid tags values",
tags: map[string]string{
"scaledownutilizationthreshold": "not-a-float",
"scaledownunneededtime": "not-a-duration",
"ScaleDownUnreadyTime": "",
config.DefaultScaleDownUtilizationThresholdKey: "not-a-float",
config.DefaultScaleDownUnneededTimeKey: "not-a-duration",
"ScaleDownUnreadyTime": "",
config.DefaultIgnoreDaemonSetsUtilizationKey: "not-a-bool",
},
expected: &defaultOptions,
},
{
description: "use provided tags and fill missing with defaults",
tags: map[string]string{
"scaledownutilizationthreshold": "0.42",
"scaledownunneededtime": "1h",
config.DefaultScaleDownUtilizationThresholdKey: "0.42",
config.DefaultScaleDownUnneededTimeKey: "1h",
config.DefaultIgnoreDaemonSetsUtilizationKey: "true",
},
expected: &config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.42,
ScaleDownGpuUtilizationThreshold: defaultOptions.ScaleDownGpuUtilizationThreshold,
ScaleDownUnneededTime: time.Hour,
ScaleDownUnreadyTime: defaultOptions.ScaleDownUnreadyTime,
IgnoreDaemonSetsUtilization: true,
},
},
{
description: "use provided tags (happy path)",
tags: map[string]string{
config.DefaultScaleDownUtilizationThresholdKey: "0.42",
config.DefaultScaleDownUnneededTimeKey: "1h",
config.DefaultScaleDownGpuUtilizationThresholdKey: "0.7",
config.DefaultScaleDownUnreadyTimeKey: "25m",
config.DefaultIgnoreDaemonSetsUtilizationKey: "true",
},
expected: &config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.42,
ScaleDownGpuUtilizationThreshold: 0.7,
ScaleDownUnneededTime: time.Hour,
ScaleDownUnreadyTime: 25 * time.Minute,
IgnoreDaemonSetsUtilization: true,
},
},
{
description: "ignore unknown tags",
tags: map[string]string{
"scaledownutilizationthreshold": "0.6",
"scaledowngpuutilizationthreshold": "0.7",
"scaledownunneededtime": "1m",
"scaledownunreadytime": "1h",
"notyetspecified": "42",
config.DefaultScaleDownUtilizationThresholdKey: "0.6",
config.DefaultScaleDownGpuUtilizationThresholdKey: "0.7",
config.DefaultScaleDownUnneededTimeKey: "1m",
config.DefaultScaleDownUnreadyTimeKey: "1h",
"notyetspecified": "42",
},
expected: &config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.6,
ScaleDownGpuUtilizationThreshold: 0.7,
ScaleDownUnneededTime: time.Minute,
ScaleDownUnreadyTime: time.Hour,
IgnoreDaemonSetsUtilization: false,
},
},
}

View File

@ -48,6 +48,8 @@ type NodeGroupAutoscalingOptions struct {
MaxNodeProvisionTime time.Duration
// ZeroOrMaxNodeScaling means that a node group should be scaled up to maximum size or down to zero nodes all at once instead of one-by-one.
ZeroOrMaxNodeScaling bool
// IgnoreDaemonSetsUtilization sets if daemonsets utilization should be considered during node scale-down
IgnoreDaemonSetsUtilization bool
}
// GCEOptions contain autoscaling options specific to GCE cloud provider.
@ -117,8 +119,6 @@ type AutoscalingOptions struct {
GRPCExpanderCert string
// GRPCExpanderURL is the url of the gRPC server when using the gRPC expander
GRPCExpanderURL string
// IgnoreDaemonSetsUtilization is whether CA will ignore DaemonSet pods when calculating resource utilization for scaling down
IgnoreDaemonSetsUtilization bool
// IgnoreMirrorPodsUtilization is whether CA will ignore Mirror pods when calculating resource utilization for scaling down
IgnoreMirrorPodsUtilization bool
// MaxGracefulTerminationSec is maximum number of seconds scale down waits for pods to terminate before

View File

@ -16,6 +16,8 @@ limitations under the License.
package config
import "time"
const (
// DefaultMaxClusterCores is the default maximum number of cores in the cluster.
DefaultMaxClusterCores = 5000 * 64
@ -32,4 +34,14 @@ const (
DefaultScaleDownUnreadyTimeKey = "scaledownunreadytime"
// DefaultMaxNodeProvisionTimeKey identifies MaxNodeProvisionTime autoscaling option
DefaultMaxNodeProvisionTimeKey = "maxnodeprovisiontime"
// DefaultIgnoreDaemonSetsUtilizationKey identifies IgnoreDaemonSetsUtilization autoscaling option
DefaultIgnoreDaemonSetsUtilizationKey = "ignoredaemonsetsutilization"
// DefaultScaleDownUnneededTime identifies ScaleDownUnneededTime autoscaling option
DefaultScaleDownUnneededTime = 10 * time.Minute
// DefaultScaleDownUnreadyTime identifies ScaleDownUnreadyTime autoscaling option
DefaultScaleDownUnreadyTime = 20 * time.Minute
// DefaultScaleDownUtilizationThreshold identifies ScaleDownUtilizationThreshold autoscaling option
DefaultScaleDownUtilizationThreshold = 0.5
// DefaultScaleDownGpuUtilizationThreshold identifies ScaleDownGpuUtilizationThreshold autoscaling option
DefaultScaleDownGpuUtilizationThreshold = 0.5
)

View File

@ -52,10 +52,18 @@ type Actuator struct {
// This is a larger change to the code structure which impacts some existing actuator unit tests
// as well as Cluster Autoscaler implementations that may override ScaleDownSetProcessor
budgetProcessor *budgets.ScaleDownBudgetProcessor
configGetter actuatorNodeGroupConfigGetter
}
// actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used
// from NodeGroupConfigProcessor interface
type actuatorNodeGroupConfigGetter interface {
// GetIgnoreDaemonSetsUtilization returns IgnoreDaemonSetsUtilization value that should be used for a given NodeGroup.
GetIgnoreDaemonSetsUtilization(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (bool, error)
}
// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions) *Actuator {
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval)
return &Actuator{
ctx: ctx,
@ -64,6 +72,7 @@ func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterState
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
configGetter: configGetter,
}
}
@ -263,8 +272,14 @@ func (a *Actuator) scaleDownNodeToReport(node *apiv1.Node, drain bool) (*status.
if err != nil {
return nil, err
}
ignoreDaemonSetsUtilization, err := a.configGetter.GetIgnoreDaemonSetsUtilization(a.ctx, nodeGroup)
if err != nil {
return nil, err
}
gpuConfig := a.ctx.CloudProvider.GetNodeGpuConfig(node)
utilInfo, err := utilization.Calculate(nodeInfo, a.ctx.IgnoreDaemonSetsUtilization, a.ctx.IgnoreMirrorPodsUtilization, gpuConfig, time.Now())
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, a.ctx.IgnoreMirrorPodsUtilization, gpuConfig, time.Now())
if err != nil {
return nil, err
}

View File

@ -43,32 +43,40 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
func TestStartDeletion(t *testing.T) {
testNg := testprovider.NewTestNodeGroup("test", 0, 100, 3, true, false, "n1-standard-2", nil, nil)
type startDeletionTestCase struct {
emptyNodes []*budgets.NodeGroupView
drainNodes []*budgets.NodeGroupView
pods map[string][]*apiv1.Pod
failedPodDrain map[string]bool
failedNodeDeletion map[string]bool
failedNodeTaint map[string]bool
wantStatus *status.ScaleDownStatus
wantErr error
wantDeletedPods []string
wantDeletedNodes []string
wantTaintUpdates map[string][][]apiv1.Taint
wantNodeDeleteResults map[string]status.NodeDeleteResult
}
func getStartDeletionTestCases(testNg *testprovider.TestNodeGroup, ignoreDaemonSetsUtilization bool, suffix string) map[string]startDeletionTestCase {
toBeDeletedTaint := apiv1.Taint{Key: taints.ToBeDeletedTaint, Effect: apiv1.TaintEffectNoSchedule}
dsUtilInfo := generateUtilInfo(2./8., 2./8.)
if ignoreDaemonSetsUtilization {
dsUtilInfo = generateUtilInfo(0./8., 0./8.)
}
atomic2 := sizedNodeGroup("atomic-2", 2, true)
atomic4 := sizedNodeGroup("atomic-4", 4, true)
toBeDeletedTaint := apiv1.Taint{Key: taints.ToBeDeletedTaint, Effect: apiv1.TaintEffectNoSchedule}
for tn, tc := range map[string]struct {
emptyNodes []*budgets.NodeGroupView
drainNodes []*budgets.NodeGroupView
pods map[string][]*apiv1.Pod
failedPodDrain map[string]bool
failedNodeDeletion map[string]bool
failedNodeTaint map[string]bool
wantStatus *status.ScaleDownStatus
wantErr error
wantDeletedPods []string
wantDeletedNodes []string
wantTaintUpdates map[string][][]apiv1.Taint
wantNodeDeleteResults map[string]status.NodeDeleteResult
}{
testCases := map[string]startDeletionTestCase{
"nothing to delete": {
emptyNodes: nil,
drainNodes: nil,
@ -528,8 +536,8 @@ func TestStartDeletion(t *testing.T) {
"DS pods are evicted from empty nodes, but don't block deletion on error": {
emptyNodes: generateNodeGroupViewList(testNg, 0, 2),
pods: map[string][]*apiv1.Pod{
"test-node-0": {generateDsPod("test-node-0-ds-pod-0", "test-node-0"), generateDsPod("test-node-0-ds-pod-1", "test-node-0")},
"test-node-1": {generateDsPod("test-node-1-ds-pod-0", "test-node-1"), generateDsPod("test-node-1-ds-pod-1", "test-node-1")},
"test-node-0": generateDsPods(2, "test-node-0"),
"test-node-1": generateDsPods(2, "test-node-1"),
},
failedPodDrain: map[string]bool{"test-node-1-ds-pod-0": true},
wantStatus: &status.ScaleDownStatus{
@ -539,13 +547,13 @@ func TestStartDeletion(t *testing.T) {
Node: generateNode("test-node-0"),
NodeGroup: testNg,
EvictedPods: nil,
UtilInfo: generateUtilInfo(2./8., 2./8.),
UtilInfo: dsUtilInfo,
},
{
Node: generateNode("test-node-1"),
NodeGroup: testNg,
EvictedPods: nil,
UtilInfo: generateUtilInfo(2./8., 2./8.),
UtilInfo: dsUtilInfo,
},
},
},
@ -564,6 +572,111 @@ func TestStartDeletion(t *testing.T) {
"test-node-1": {ResultType: status.NodeDeleteOk},
},
},
"DS pods and deletion with drain": {
drainNodes: generateNodeGroupViewList(testNg, 0, 2),
pods: map[string][]*apiv1.Pod{
"test-node-0": generateDsPods(2, "test-node-0"),
"test-node-1": generateDsPods(2, "test-node-1"),
},
wantStatus: &status.ScaleDownStatus{
Result: status.ScaleDownNodeDeleteStarted,
ScaledDownNodes: []*status.ScaleDownNode{
{
Node: generateNode("test-node-0"),
NodeGroup: testNg,
// this is nil because DaemonSetEvictionForOccupiedNodes is
// not enabled for drained nodes in this test suite
EvictedPods: nil,
UtilInfo: dsUtilInfo,
},
{
Node: generateNode("test-node-1"),
NodeGroup: testNg,
// this is nil because DaemonSetEvictionForOccupiedNodes is
// not enabled for drained nodes in this test suite
EvictedPods: nil,
UtilInfo: dsUtilInfo,
},
},
},
wantDeletedNodes: []string{"test-node-0", "test-node-1"},
// same as evicted pods
wantDeletedPods: nil,
wantTaintUpdates: map[string][][]apiv1.Taint{
"test-node-0": {
{toBeDeletedTaint},
},
"test-node-1": {
{toBeDeletedTaint},
},
},
wantNodeDeleteResults: map[string]status.NodeDeleteResult{
"test-node-0": {ResultType: status.NodeDeleteOk},
"test-node-1": {ResultType: status.NodeDeleteOk},
},
},
"DS pods and empty and drain deletion work correctly together": {
emptyNodes: generateNodeGroupViewList(testNg, 0, 2),
drainNodes: generateNodeGroupViewList(testNg, 2, 4),
pods: map[string][]*apiv1.Pod{
"test-node-2": removablePods(2, "test-node-2"),
"test-node-3": generateDsPods(2, "test-node-3"),
},
wantStatus: &status.ScaleDownStatus{
Result: status.ScaleDownNodeDeleteStarted,
ScaledDownNodes: []*status.ScaleDownNode{
{
Node: generateNode("test-node-0"),
NodeGroup: testNg,
EvictedPods: nil,
UtilInfo: generateUtilInfo(0, 0),
},
{
Node: generateNode("test-node-1"),
NodeGroup: testNg,
EvictedPods: nil,
UtilInfo: generateUtilInfo(0, 0),
},
{
Node: generateNode("test-node-2"),
NodeGroup: testNg,
EvictedPods: removablePods(2, "test-node-2"),
UtilInfo: generateUtilInfo(2./8., 2./8.),
},
{
Node: generateNode("test-node-3"),
NodeGroup: testNg,
// this is nil because DaemonSetEvictionForOccupiedNodes is
// not enabled for drained nodes in this test suite
EvictedPods: nil,
UtilInfo: dsUtilInfo,
},
},
},
wantDeletedNodes: []string{"test-node-0", "test-node-1", "test-node-2", "test-node-3"},
// same as evicted pods
wantDeletedPods: nil,
wantTaintUpdates: map[string][][]apiv1.Taint{
"test-node-0": {
{toBeDeletedTaint},
},
"test-node-1": {
{toBeDeletedTaint},
},
"test-node-2": {
{toBeDeletedTaint},
},
"test-node-3": {
{toBeDeletedTaint},
},
},
wantNodeDeleteResults: map[string]status.NodeDeleteResult{
"test-node-0": {ResultType: status.NodeDeleteOk},
"test-node-1": {ResultType: status.NodeDeleteOk},
"test-node-2": {ResultType: status.NodeDeleteOk},
"test-node-3": {ResultType: status.NodeDeleteOk},
},
},
"nodes with pods are not deleted if the node is passed as empty": {
emptyNodes: generateNodeGroupViewList(testNg, 0, 2),
pods: map[string][]*apiv1.Pod{
@ -668,262 +781,294 @@ func TestStartDeletion(t *testing.T) {
"atomic-2-node-1": {ResultType: status.NodeDeleteErrorInternal, Err: cmpopts.AnyError},
},
},
} {
t.Run(tn, func(t *testing.T) {
// This is needed because the tested code starts goroutines that can technically live longer than the execution
// of a single test case, and the goroutines eventually access tc in fakeClient hooks below.
tc := tc
// Insert all nodes into a map to support live node updates and GETs.
allEmptyNodes, allDrainNodes := []*apiv1.Node{}, []*apiv1.Node{}
nodesByName := make(map[string]*apiv1.Node)
nodesLock := sync.Mutex{}
for _, bucket := range tc.emptyNodes {
allEmptyNodes = append(allEmptyNodes, bucket.Nodes...)
for _, node := range allEmptyNodes {
nodesByName[node.Name] = node
}
}
for _, bucket := range tc.drainNodes {
allDrainNodes = append(allDrainNodes, bucket.Nodes...)
for _, node := range bucket.Nodes {
nodesByName[node.Name] = node
}
}
}
// Set up a fake k8s client to hook and verify certain actions.
fakeClient := &fake.Clientset{}
type nodeTaints struct {
nodeName string
taints []apiv1.Taint
}
taintUpdates := make(chan nodeTaints, 10)
deletedNodes := make(chan string, 10)
deletedPods := make(chan string, 10)
testCasesWithNGNames := map[string]startDeletionTestCase{}
for k, v := range testCases {
testCasesWithNGNames[k+" "+suffix] = v
}
ds := generateDaemonSet()
return testCasesWithNGNames
}
// We're faking the whole k8s client, and some of the code needs to get live nodes and pods, so GET on nodes and pods has to be set up.
fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
nodesLock.Lock()
defer nodesLock.Unlock()
getAction := action.(core.GetAction)
node, found := nodesByName[getAction.GetName()]
if !found {
return true, nil, fmt.Errorf("node %q not found", getAction.GetName())
func TestStartDeletion(t *testing.T) {
testNg1 := testprovider.NewTestNodeGroup("test", 100, 0, 3, true, false, "n1-standard-2", nil, nil)
opts1 := &config.NodeGroupAutoscalingOptions{
IgnoreDaemonSetsUtilization: false,
}
testNg1.SetOptions(opts1)
testNg2 := testprovider.NewTestNodeGroup("test", 100, 0, 3, true, false, "n1-standard-2", nil, nil)
opts2 := &config.NodeGroupAutoscalingOptions{
IgnoreDaemonSetsUtilization: true,
}
testNg2.SetOptions(opts2)
testSets := []map[string]startDeletionTestCase{
// IgnoreDaemonSetsUtilization is false
getStartDeletionTestCases(testNg1, opts1.IgnoreDaemonSetsUtilization, "testNg1"),
// IgnoreDaemonSetsUtilization is true
getStartDeletionTestCases(testNg2, opts2.IgnoreDaemonSetsUtilization, "testNg2"),
}
for _, testSet := range testSets {
for tn, tc := range testSet {
t.Run(tn, func(t *testing.T) {
// This is needed because the tested code starts goroutines that can technically live longer than the execution
// of a single test case, and the goroutines eventually access tc in fakeClient hooks below.
tc := tc
// Insert all nodes into a map to support live node updates and GETs.
allEmptyNodes, allDrainNodes := []*apiv1.Node{}, []*apiv1.Node{}
nodesByName := make(map[string]*apiv1.Node)
nodesLock := sync.Mutex{}
for _, bucket := range tc.emptyNodes {
allEmptyNodes = append(allEmptyNodes, bucket.Nodes...)
for _, node := range allEmptyNodes {
nodesByName[node.Name] = node
}
}
return true, node, nil
})
fakeClient.Fake.AddReactor("get", "pods",
func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
})
// Hook node update to gather all taint updates, and to fail the update for certain nodes to simulate errors.
fakeClient.Fake.AddReactor("update", "nodes",
func(action core.Action) (bool, runtime.Object, error) {
for _, bucket := range tc.drainNodes {
allDrainNodes = append(allDrainNodes, bucket.Nodes...)
for _, node := range bucket.Nodes {
nodesByName[node.Name] = node
}
}
// Set up a fake k8s client to hook and verify certain actions.
fakeClient := &fake.Clientset{}
type nodeTaints struct {
nodeName string
taints []apiv1.Taint
}
taintUpdates := make(chan nodeTaints, 10)
deletedNodes := make(chan string, 10)
deletedPods := make(chan string, 10)
ds := generateDaemonSet()
// We're faking the whole k8s client, and some of the code needs to get live nodes and pods, so GET on nodes and pods has to be set up.
fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
nodesLock.Lock()
defer nodesLock.Unlock()
update := action.(core.UpdateAction)
obj := update.GetObject().(*apiv1.Node)
if tc.failedNodeTaint[obj.Name] {
return true, nil, fmt.Errorf("SIMULATED ERROR: won't taint")
}
nt := nodeTaints{
nodeName: obj.Name,
}
for _, taint := range obj.Spec.Taints {
nt.taints = append(nt.taints, taint)
}
taintUpdates <- nt
nodesByName[obj.Name] = obj.DeepCopy()
return true, obj, nil
})
// Hook eviction creation to gather which pods were evicted, and to fail the eviction for certain pods to simulate errors.
fakeClient.Fake.AddReactor("create", "pods",
func(action core.Action) (bool, runtime.Object, error) {
createAction := action.(core.CreateAction)
if createAction == nil {
return false, nil, nil
}
eviction := createAction.GetObject().(*policyv1beta1.Eviction)
if eviction == nil {
return false, nil, nil
}
if tc.failedPodDrain[eviction.Name] {
return true, nil, fmt.Errorf("SIMULATED ERROR: won't evict")
}
deletedPods <- eviction.Name
return true, nil, nil
})
// Hook node deletion at the level of cloud provider, to gather which nodes were deleted, and to fail the deletion for
// certain nodes to simulate errors.
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
if tc.failedNodeDeletion[node] {
return fmt.Errorf("SIMULATED ERROR: won't remove node")
}
deletedNodes <- node
return nil
})
for _, bucket := range tc.emptyNodes {
bucket.Group.(*testprovider.TestNodeGroup).SetCloudProvider(provider)
provider.InsertNodeGroup(bucket.Group)
for _, node := range bucket.Nodes {
provider.AddNode(bucket.Group.Id(), node)
}
}
for _, bucket := range tc.drainNodes {
bucket.Group.(*testprovider.TestNodeGroup).SetCloudProvider(provider)
provider.InsertNodeGroup(bucket.Group)
for _, node := range bucket.Nodes {
provider.AddNode(bucket.Group.Id(), node)
}
}
// Set up other needed structures and options.
opts := config.AutoscalingOptions{
MaxScaleDownParallelism: 10,
MaxDrainParallelism: 5,
MaxPodEvictionTime: 0,
DaemonSetEvictionForEmptyNodes: true,
}
allPods := []*apiv1.Pod{}
for _, pods := range tc.pods {
allPods = append(allPods, pods...)
}
podLister := kube_util.NewTestPodLister(allPods)
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
dsLister, err := kube_util.NewTestDaemonSetLister([]*appsv1.DaemonSet{ds})
if err != nil {
t.Fatalf("Couldn't create daemonset lister")
}
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, dsLister, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
}
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff())
csr.RegisterProviders(clusterstate.NewMockMaxNodeProvisionTimeProvider(15 * time.Minute))
for _, bucket := range tc.emptyNodes {
for _, node := range bucket.Nodes {
err := ctx.ClusterSnapshot.AddNodeWithPods(node, tc.pods[node.Name])
if err != nil {
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
}
}
}
for _, bucket := range tc.drainNodes {
for _, node := range bucket.Nodes {
pods, found := tc.pods[node.Name]
getAction := action.(core.GetAction)
node, found := nodesByName[getAction.GetName()]
if !found {
t.Fatalf("Drain node %q doesn't have pods defined in the test case.", node.Name)
return true, nil, fmt.Errorf("node %q not found", getAction.GetName())
}
err := ctx.ClusterSnapshot.AddNodeWithPods(node, pods)
if err != nil {
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
return true, node, nil
})
fakeClient.Fake.AddReactor("get", "pods",
func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
})
// Hook node update to gather all taint updates, and to fail the update for certain nodes to simulate errors.
fakeClient.Fake.AddReactor("update", "nodes",
func(action core.Action) (bool, runtime.Object, error) {
nodesLock.Lock()
defer nodesLock.Unlock()
update := action.(core.UpdateAction)
obj := update.GetObject().(*apiv1.Node)
if tc.failedNodeTaint[obj.Name] {
return true, nil, fmt.Errorf("SIMULATED ERROR: won't taint")
}
nt := nodeTaints{
nodeName: obj.Name,
}
for _, taint := range obj.Spec.Taints {
nt.taints = append(nt.taints, taint)
}
taintUpdates <- nt
nodesByName[obj.Name] = obj.DeepCopy()
return true, obj, nil
})
// Hook eviction creation to gather which pods were evicted, and to fail the eviction for certain pods to simulate errors.
fakeClient.Fake.AddReactor("create", "pods",
func(action core.Action) (bool, runtime.Object, error) {
createAction := action.(core.CreateAction)
if createAction == nil {
return false, nil, nil
}
eviction := createAction.GetObject().(*policyv1beta1.Eviction)
if eviction == nil {
return false, nil, nil
}
if tc.failedPodDrain[eviction.Name] {
return true, nil, fmt.Errorf("SIMULATED ERROR: won't evict")
}
deletedPods <- eviction.Name
return true, nil, nil
})
// Hook node deletion at the level of cloud provider, to gather which nodes were deleted, and to fail the deletion for
// certain nodes to simulate errors.
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
if tc.failedNodeDeletion[node] {
return fmt.Errorf("SIMULATED ERROR: won't remove node")
}
deletedNodes <- node
return nil
})
for _, bucket := range tc.emptyNodes {
bucket.Group.(*testprovider.TestNodeGroup).SetCloudProvider(provider)
provider.InsertNodeGroup(bucket.Group)
for _, node := range bucket.Nodes {
provider.AddNode(bucket.Group.Id(), node)
}
}
}
// Create Actuator, run StartDeletion, and verify the error.
ndt := deletiontracker.NewNodeDeletionTracker(0)
ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, 0*time.Second)
evictor := Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
}
gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
t.Errorf("StartDeletion error diff (-want +got):\n%s", diff)
}
// Verify ScaleDownStatus looks as expected.
ignoreSdNodeOrder := cmpopts.SortSlices(func(a, b *status.ScaleDownNode) bool { return a.Node.Name < b.Node.Name })
ignoreTimestamps := cmpopts.IgnoreFields(status.ScaleDownStatus{}, "NodeDeleteResultsAsOf")
cmpNg := cmp.Comparer(func(a, b *testprovider.TestNodeGroup) bool { return a.Id() == b.Id() })
statusCmpOpts := cmp.Options{ignoreSdNodeOrder, ignoreTimestamps, cmpNg, cmpopts.EquateEmpty()}
if diff := cmp.Diff(tc.wantStatus, gotStatus, statusCmpOpts); diff != "" {
t.Errorf("StartDeletion status diff (-want +got):\n%s", diff)
}
// Verify that all expected nodes were deleted using the cloud provider hook.
var gotDeletedNodes []string
nodesLoop:
for i := 0; i < len(tc.wantDeletedNodes); i++ {
select {
case deletedNode := <-deletedNodes:
gotDeletedNodes = append(gotDeletedNodes, deletedNode)
case <-time.After(3 * time.Second):
t.Errorf("Timeout while waiting for deleted nodes.")
break nodesLoop
for _, bucket := range tc.drainNodes {
bucket.Group.(*testprovider.TestNodeGroup).SetCloudProvider(provider)
provider.InsertNodeGroup(bucket.Group)
for _, node := range bucket.Nodes {
provider.AddNode(bucket.Group.Id(), node)
}
}
}
ignoreStrOrder := cmpopts.SortSlices(func(a, b string) bool { return a < b })
if diff := cmp.Diff(tc.wantDeletedNodes, gotDeletedNodes, ignoreStrOrder); diff != "" {
t.Errorf("deletedNodes diff (-want +got):\n%s", diff)
}
// Verify that all expected pods were deleted using the fake k8s client hook.
var gotDeletedPods []string
podsLoop:
for i := 0; i < len(tc.wantDeletedPods); i++ {
select {
case deletedPod := <-deletedPods:
gotDeletedPods = append(gotDeletedPods, deletedPod)
case <-time.After(3 * time.Second):
t.Errorf("Timeout while waiting for deleted pods.")
break podsLoop
// Set up other needed structures and options.
opts := config.AutoscalingOptions{
MaxScaleDownParallelism: 10,
MaxDrainParallelism: 5,
MaxPodEvictionTime: 0,
DaemonSetEvictionForEmptyNodes: true,
}
}
if diff := cmp.Diff(tc.wantDeletedPods, gotDeletedPods, ignoreStrOrder); diff != "" {
t.Errorf("deletedPods diff (-want +got):\n%s", diff)
}
// Verify that all expected taint updates happened using the fake k8s client hook.
allUpdatesCount := 0
for _, updates := range tc.wantTaintUpdates {
allUpdatesCount += len(updates)
}
gotTaintUpdates := make(map[string][][]apiv1.Taint)
taintsLoop:
for i := 0; i < allUpdatesCount; i++ {
select {
case taintUpdate := <-taintUpdates:
gotTaintUpdates[taintUpdate.nodeName] = append(gotTaintUpdates[taintUpdate.nodeName], taintUpdate.taints)
case <-time.After(3 * time.Second):
t.Errorf("Timeout while waiting for taint updates.")
break taintsLoop
allPods := []*apiv1.Pod{}
for _, pods := range tc.pods {
allPods = append(allPods, pods...)
}
}
ignoreTaintValue := cmpopts.IgnoreFields(apiv1.Taint{}, "Value")
if diff := cmp.Diff(tc.wantTaintUpdates, gotTaintUpdates, ignoreTaintValue, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("taintUpdates diff (-want +got):\n%s", diff)
}
// Wait for all expected deletions to be reported in NodeDeletionTracker. Reporting happens shortly after the deletion
// in cloud provider we sync to above and so this will usually not wait at all. However, it can still happen
// that there is a delay between cloud provider deletion and reporting, in which case the results are not there yet
// and we need to wait for them before asserting.
err = waitForDeletionResultsCount(actuator.nodeDeletionTracker, len(tc.wantNodeDeleteResults), 3*time.Second, 200*time.Millisecond)
if err != nil {
t.Errorf("Timeout while waiting for node deletion results")
}
podLister := kube_util.NewTestPodLister(allPods)
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
dsLister, err := kube_util.NewTestDaemonSetLister([]*appsv1.DaemonSet{ds})
if err != nil {
t.Fatalf("Couldn't create daemonset lister")
}
// Run StartDeletion again to gather node deletion results for deletions started in the previous call, and verify
// that they look as expected.
gotNextStatus, gotNextErr := actuator.StartDeletion(nil, nil)
if gotNextErr != nil {
t.Errorf("StartDeletion unexpected error: %v", gotNextErr)
}
if diff := cmp.Diff(tc.wantNodeDeleteResults, gotNextStatus.NodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" {
t.Errorf("NodeDeleteResults diff (-want +got):\n%s", diff)
}
})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, dsLister, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
}
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff())
csr.RegisterProviders(clusterstate.NewMockMaxNodeProvisionTimeProvider(15 * time.Minute))
for _, bucket := range tc.emptyNodes {
for _, node := range bucket.Nodes {
err := ctx.ClusterSnapshot.AddNodeWithPods(node, tc.pods[node.Name])
if err != nil {
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
}
}
}
for _, bucket := range tc.drainNodes {
for _, node := range bucket.Nodes {
pods, found := tc.pods[node.Name]
if !found {
t.Fatalf("Drain node %q doesn't have pods defined in the test case.", node.Name)
}
err := ctx.ClusterSnapshot.AddNodeWithPods(node, pods)
if err != nil {
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
}
}
}
// Create Actuator, run StartDeletion, and verify the error.
ndt := deletiontracker.NewNodeDeletionTracker(0)
ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, 0*time.Second)
evictor := Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
}
gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
t.Errorf("StartDeletion error diff (-want +got):\n%s", diff)
}
// Verify ScaleDownStatus looks as expected.
ignoreSdNodeOrder := cmpopts.SortSlices(func(a, b *status.ScaleDownNode) bool { return a.Node.Name < b.Node.Name })
ignoreTimestamps := cmpopts.IgnoreFields(status.ScaleDownStatus{}, "NodeDeleteResultsAsOf")
cmpNg := cmp.Comparer(func(a, b *testprovider.TestNodeGroup) bool { return a.Id() == b.Id() })
statusCmpOpts := cmp.Options{ignoreSdNodeOrder, ignoreTimestamps, cmpNg, cmpopts.EquateEmpty()}
if diff := cmp.Diff(tc.wantStatus, gotStatus, statusCmpOpts); diff != "" {
t.Errorf("StartDeletion status diff (-want +got):\n%s", diff)
}
// Verify that all expected nodes were deleted using the cloud provider hook.
var gotDeletedNodes []string
nodesLoop:
for i := 0; i < len(tc.wantDeletedNodes); i++ {
select {
case deletedNode := <-deletedNodes:
gotDeletedNodes = append(gotDeletedNodes, deletedNode)
case <-time.After(3 * time.Second):
t.Errorf("Timeout while waiting for deleted nodes.")
break nodesLoop
}
}
ignoreStrOrder := cmpopts.SortSlices(func(a, b string) bool { return a < b })
if diff := cmp.Diff(tc.wantDeletedNodes, gotDeletedNodes, ignoreStrOrder); diff != "" {
t.Errorf("deletedNodes diff (-want +got):\n%s", diff)
}
// Verify that all expected pods were deleted using the fake k8s client hook.
var gotDeletedPods []string
podsLoop:
for i := 0; i < len(tc.wantDeletedPods); i++ {
select {
case deletedPod := <-deletedPods:
gotDeletedPods = append(gotDeletedPods, deletedPod)
case <-time.After(3 * time.Second):
t.Errorf("Timeout while waiting for deleted pods.")
break podsLoop
}
}
if diff := cmp.Diff(tc.wantDeletedPods, gotDeletedPods, ignoreStrOrder); diff != "" {
t.Errorf("deletedPods diff (-want +got):\n%s", diff)
}
// Verify that all expected taint updates happened using the fake k8s client hook.
allUpdatesCount := 0
for _, updates := range tc.wantTaintUpdates {
allUpdatesCount += len(updates)
}
gotTaintUpdates := make(map[string][][]apiv1.Taint)
taintsLoop:
for i := 0; i < allUpdatesCount; i++ {
select {
case taintUpdate := <-taintUpdates:
gotTaintUpdates[taintUpdate.nodeName] = append(gotTaintUpdates[taintUpdate.nodeName], taintUpdate.taints)
case <-time.After(3 * time.Second):
t.Errorf("Timeout while waiting for taint updates.")
break taintsLoop
}
}
ignoreTaintValue := cmpopts.IgnoreFields(apiv1.Taint{}, "Value")
if diff := cmp.Diff(tc.wantTaintUpdates, gotTaintUpdates, ignoreTaintValue, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("taintUpdates diff (-want +got):\n%s", diff)
}
// Wait for all expected deletions to be reported in NodeDeletionTracker. Reporting happens shortly after the deletion
// in cloud provider we sync to above and so this will usually not wait at all. However, it can still happen
// that there is a delay between cloud provider deletion and reporting, in which case the results are not there yet
// and we need to wait for them before asserting.
err = waitForDeletionResultsCount(actuator.nodeDeletionTracker, len(tc.wantNodeDeleteResults), 3*time.Second, 200*time.Millisecond)
if err != nil {
t.Errorf("Timeout while waiting for node deletion results")
}
// Run StartDeletion again to gather node deletion results for deletions started in the previous call, and verify
// that they look as expected.
gotNextStatus, gotNextErr := actuator.StartDeletion(nil, nil)
if gotNextErr != nil {
t.Errorf("StartDeletion unexpected error: %v", gotNextErr)
}
if diff := cmp.Diff(tc.wantNodeDeleteResults, gotNextStatus.NodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" {
t.Errorf("NodeDeleteResults diff (-want +got):\n%s", diff)
}
})
}
}
}
@ -1181,8 +1326,18 @@ func removablePod(name string, node string) *apiv1.Pod {
}
}
func generateDsPods(count int, node string) []*apiv1.Pod {
var result []*apiv1.Pod
for i := 0; i < count; i++ {
name := fmt.Sprintf("ds-pod-%d", i)
result = append(result, generateDsPod(name, node))
}
return result
}
func generateDsPod(name string, node string) *apiv1.Pod {
pod := removablePod(name, node)
pod := removablePod(fmt.Sprintf("%s-%s", node, name), node)
pod.OwnerReferences = GenerateOwnerReferences("ds", "DaemonSet", "apps/v1", "some-uid")
return pod
}

View File

@ -41,20 +41,22 @@ const (
// Checker is responsible for deciding which nodes pass the criteria for scale down.
type Checker struct {
thresholdGetter utilizationThresholdGetter
configGetter nodeGroupConfigGetter
}
type utilizationThresholdGetter interface {
type nodeGroupConfigGetter interface {
// GetScaleDownUtilizationThreshold returns ScaleDownUtilizationThreshold value that should be used for a given NodeGroup.
GetScaleDownUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error)
// GetScaleDownGpuUtilizationThreshold returns ScaleDownGpuUtilizationThreshold value that should be used for a given NodeGroup.
GetScaleDownGpuUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error)
// GetIgnoreDaemonSetsUtilization returns IgnoreDaemonSetsUtilization value that should be used for a given NodeGroup.
GetIgnoreDaemonSetsUtilization(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (bool, error)
}
// NewChecker creates a new Checker object.
func NewChecker(thresholdGetter utilizationThresholdGetter) *Checker {
func NewChecker(configGetter nodeGroupConfigGetter) *Checker {
return &Checker{
thresholdGetter: thresholdGetter,
configGetter: configGetter,
}
}
@ -118,12 +120,6 @@ func (c *Checker) unremovableReasonAndNodeUtilization(context *context.Autoscali
return simulator.ScaleDownDisabledAnnotation, nil
}
gpuConfig := context.CloudProvider.GetNodeGpuConfig(node)
utilInfo, err := utilization.Calculate(nodeInfo, context.IgnoreDaemonSetsUtilization, context.IgnoreMirrorPodsUtilization, gpuConfig, timestamp)
if err != nil {
klog.Warningf("Failed to calculate utilization for %s: %v", node.Name, err)
}
nodeGroup, err := context.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Warning("Node group not found for node %v: %v", node.Name, err)
@ -136,6 +132,18 @@ func (c *Checker) unremovableReasonAndNodeUtilization(context *context.Autoscali
return simulator.NotAutoscaled, nil
}
ignoreDaemonSetsUtilization, err := c.configGetter.GetIgnoreDaemonSetsUtilization(context, nodeGroup)
if err != nil {
klog.Warningf("Couldn't retrieve `IgnoreDaemonSetsUtilization` option for node %v: %v", node.Name, err)
return simulator.UnexpectedError, nil
}
gpuConfig := context.CloudProvider.GetNodeGpuConfig(node)
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, context.IgnoreMirrorPodsUtilization, gpuConfig, timestamp)
if err != nil {
klog.Warningf("Failed to calculate utilization for %s: %v", node.Name, err)
}
// If scale down of unready nodes is disabled, skip the node if it is unready
if !context.ScaleDownUnreadyEnabled {
ready, _, _ := kube_util.GetReadinessState(node)
@ -166,12 +174,12 @@ func (c *Checker) isNodeBelowUtilizationThreshold(context *context.AutoscalingCo
var err error
gpuConfig := context.CloudProvider.GetNodeGpuConfig(node)
if gpuConfig != nil {
threshold, err = c.thresholdGetter.GetScaleDownGpuUtilizationThreshold(context, nodeGroup)
threshold, err = c.configGetter.GetScaleDownGpuUtilizationThreshold(context, nodeGroup)
if err != nil {
return false, err
}
} else {
threshold, err = c.thresholdGetter.GetScaleDownUtilizationThreshold(context, nodeGroup)
threshold, err = c.configGetter.GetScaleDownUtilizationThreshold(context, nodeGroup)
if err != nil {
return false, err
}

View File

@ -21,12 +21,11 @@ import (
"testing"
"time"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -36,8 +35,16 @@ import (
"k8s.io/client-go/kubernetes/fake"
)
func TestFilterOutUnremovable(t *testing.T) {
now := time.Now()
type testCase struct {
desc string
nodes []*apiv1.Node
pods []*apiv1.Pod
want []string
scaleDownUnready bool
ignoreDaemonSetsUtilization bool
}
func getTestCases(ignoreDaemonSetsUtilization bool, suffix string, now time.Time) []testCase {
regularNode := BuildTestNode("regular", 1000, 10)
SetNodeReadyState(regularNode, true, time.Time{})
@ -59,13 +66,10 @@ func TestFilterOutUnremovable(t *testing.T) {
smallPod := BuildTestPod("smallPod", 100, 0)
smallPod.Spec.NodeName = "regular"
testCases := []struct {
desc string
nodes []*apiv1.Node
pods []*apiv1.Pod
want []string
scaleDownUnready bool
}{
dsPod := BuildDSTestPod("dsPod", 500, 0)
dsPod.Spec.NodeName = "regular"
testCases := []testCase{
{
desc: "regular node stays",
nodes: []*apiv1.Node{regularNode},
@ -111,14 +115,57 @@ func TestFilterOutUnremovable(t *testing.T) {
scaleDownUnready: false,
},
}
finalTestCases := []testCase{}
for _, tc := range testCases {
tc.desc = tc.desc + " " + suffix
if ignoreDaemonSetsUtilization {
tc.ignoreDaemonSetsUtilization = true
}
finalTestCases = append(finalTestCases, tc)
}
if ignoreDaemonSetsUtilization {
finalTestCases = append(testCases, testCase{
desc: "high utilization daemonsets node is filtered out",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod, dsPod},
want: []string{},
scaleDownUnready: true,
ignoreDaemonSetsUtilization: false,
},
testCase{
desc: "high utilization daemonsets node stays",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod, dsPod},
want: []string{"regular"},
scaleDownUnready: true,
ignoreDaemonSetsUtilization: true,
})
}
return finalTestCases
}
func TestFilterOutUnremovable(t *testing.T) {
now := time.Now()
for _, tc := range append(getTestCases(false, "IgnoreDaemonSetUtilization=false", now),
getTestCases(true, "IgnoreDaemonsetUtilization=true", now)...) {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
c := NewChecker(&staticThresholdGetter{0.5})
s := nodegroupconfig.DelegatingNodeGroupConfigProcessor{}
c := NewChecker(&s)
options := config.AutoscalingOptions{
UnremovableNodeRecheckTimeout: 5 * time.Minute,
ScaleDownUnreadyEnabled: tc.scaleDownUnready,
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: config.DefaultScaleDownUtilizationThreshold,
ScaleDownGpuUtilizationThreshold: config.DefaultScaleDownGpuUtilizationThreshold,
ScaleDownUnneededTime: config.DefaultScaleDownUnneededTime,
ScaleDownUnreadyTime: config.DefaultScaleDownUnreadyTime,
IgnoreDaemonSetsUtilization: tc.ignoreDaemonSetsUtilization,
},
}
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 2)
@ -136,15 +183,3 @@ func TestFilterOutUnremovable(t *testing.T) {
})
}
}
type staticThresholdGetter struct {
threshold float64
}
func (s *staticThresholdGetter) GetScaleDownUtilizationThreshold(_ *context.AutoscalingContext, _ cloudprovider.NodeGroup) (float64, error) {
return s.threshold, nil
}
func (s *staticThresholdGetter) GetScaleDownGpuUtilizationThreshold(_ *context.AutoscalingContext, _ cloudprovider.NodeGroup) (float64, error) {
return s.threshold, nil
}

View File

@ -1303,7 +1303,8 @@ func newWrapperForTesting(ctx *context.AutoscalingContext, clusterStateRegistry
MinReplicaCount: 0,
SkipNodesWithCustomControllerPods: true,
}
sd := NewScaleDown(ctx, NewTestProcessors(ctx), ndt, deleteOptions)
actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, deleteOptions)
processors := NewTestProcessors(ctx)
sd := NewScaleDown(ctx, processors, ndt, deleteOptions)
actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, deleteOptions, processors.NodeGroupConfigProcessor)
return NewScaleDownWrapper(sd, actuator)
}

View File

@ -171,7 +171,7 @@ func NewStaticAutoscaler(
// during the struct creation rather than here.
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
scaleDown := legacy.NewScaleDown(autoscalingContext, processors, ndt, deleteOptions)
actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, deleteOptions)
actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, deleteOptions, processors.NodeGroupConfigProcessor)
autoscalingContext.ScaleDownActuator = actuator
var scaleDownPlanner scaledown.Planner

View File

@ -159,7 +159,7 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error {
func setUpScaleDownActuator(ctx *context.AutoscalingContext, options config.AutoscalingOptions) {
deleteOptions := simulator.NewNodeDeleteOptions(options)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, NewTestProcessors(ctx).NodeGroupConfigProcessor)
}
func TestStaticAutoscalerRunOnce(t *testing.T) {
@ -1433,7 +1433,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
csr.RegisterProviders(clusterstate.NewMockMaxNodeProvisionTimeProvider(15 * time.Minute))
// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), simulator.NodeDeleteOptions{})
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), simulator.NodeDeleteOptions{}, NewTestProcessors(&ctx).NodeGroupConfigProcessor)
ctx.ScaleDownActuator = actuator
// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.
@ -1761,7 +1761,7 @@ func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContex
}
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
sd := legacy.NewScaleDown(ctx, p, ndt, deleteOptions)
actuator := actuation.NewActuator(ctx, cs, ndt, deleteOptions)
actuator := actuation.NewActuator(ctx, cs, ndt, deleteOptions, p.NodeGroupConfigProcessor)
wrapper := legacy.NewScaleDownWrapper(sd, actuator)
return wrapper, wrapper
}

View File

@ -108,13 +108,13 @@ var (
"How long after node deletion that scale down evaluation resumes, defaults to scanInterval")
scaleDownDelayAfterFailure = flag.Duration("scale-down-delay-after-failure", 3*time.Minute,
"How long after scale down failure that scale down evaluation resumes")
scaleDownUnneededTime = flag.Duration("scale-down-unneeded-time", 10*time.Minute,
scaleDownUnneededTime = flag.Duration("scale-down-unneeded-time", config.DefaultScaleDownUnneededTime,
"How long a node should be unneeded before it is eligible for scale down")
scaleDownUnreadyTime = flag.Duration("scale-down-unready-time", 20*time.Minute,
scaleDownUnreadyTime = flag.Duration("scale-down-unready-time", config.DefaultScaleDownUnreadyTime,
"How long an unready node should be unneeded before it is eligible for scale down")
scaleDownUtilizationThreshold = flag.Float64("scale-down-utilization-threshold", 0.5,
scaleDownUtilizationThreshold = flag.Float64("scale-down-utilization-threshold", config.DefaultScaleDownUtilizationThreshold,
"Sum of cpu or memory of all pods running on the node divided by node's corresponding allocatable resource, below which a node can be considered for scale down")
scaleDownGpuUtilizationThreshold = flag.Float64("scale-down-gpu-utilization-threshold", 0.5,
scaleDownGpuUtilizationThreshold = flag.Float64("scale-down-gpu-utilization-threshold", config.DefaultScaleDownGpuUtilizationThreshold,
"Sum of gpu requests of all pods running on the node divided by node's allocatable resource, below which a node can be considered for scale down."+
"Utilization calculation only cares about gpu resource for accelerator node. cpu and memory utilization will be ignored.")
scaleDownNonEmptyCandidatesCount = flag.Int("scale-down-non-empty-candidates-count", 30,
@ -259,6 +259,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ScaleDownGpuUtilizationThreshold: *scaleDownGpuUtilizationThreshold,
ScaleDownUnneededTime: *scaleDownUnneededTime,
ScaleDownUnreadyTime: *scaleDownUnreadyTime,
IgnoreDaemonSetsUtilization: *ignoreDaemonSetsUtilization,
MaxNodeProvisionTime: *maxNodeProvisionTime,
},
CloudConfig: *cloudConfig,
@ -272,7 +273,6 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ExpanderNames: *expanderFlag,
GRPCExpanderCert: *grpcExpanderCert,
GRPCExpanderURL: *grpcExpanderURL,
IgnoreDaemonSetsUtilization: *ignoreDaemonSetsUtilization,
IgnoreMirrorPodsUtilization: *ignoreMirrorPodsUtilization,
MaxBulkSoftTaintCount: *maxBulkSoftTaintCount,
MaxBulkSoftTaintTime: *maxBulkSoftTaintTime,

View File

@ -35,6 +35,8 @@ type NodeGroupConfigProcessor interface {
GetScaleDownGpuUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error)
// GetMaxNodeProvisionTime return MaxNodeProvisionTime value that should be used for a given NodeGroup.
GetMaxNodeProvisionTime(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (time.Duration, error)
// GetIgnoreDaemonSetsUtilization returns IgnoreDaemonSetsUtilization value that should be used for a given NodeGroup.
GetIgnoreDaemonSetsUtilization(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (bool, error)
// CleanUp cleans up processor's internal structures.
CleanUp()
}
@ -105,6 +107,18 @@ func (p *DelegatingNodeGroupConfigProcessor) GetMaxNodeProvisionTime(context *co
return ngConfig.MaxNodeProvisionTime, nil
}
// GetIgnoreDaemonSetsUtilization returns IgnoreDaemonSetsUtilization value that should be used for a given NodeGroup.
func (p *DelegatingNodeGroupConfigProcessor) GetIgnoreDaemonSetsUtilization(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (bool, error) {
ngConfig, err := nodeGroup.GetOptions(context.NodeGroupDefaults)
if err != nil && err != cloudprovider.ErrNotImplemented {
return false, err
}
if ngConfig == nil || err == cloudprovider.ErrNotImplemented {
return context.NodeGroupDefaults.IgnoreDaemonSetsUtilization, nil
}
return ngConfig.IgnoreDaemonSetsUtilization, nil
}
// CleanUp cleans up processor's internal structures.
func (p *DelegatingNodeGroupConfigProcessor) CleanUp() {
}

View File

@ -49,6 +49,7 @@ func TestDelegatingNodeGroupConfigProcessor(t *testing.T) {
ScaleDownGpuUtilizationThreshold: 0.6,
ScaleDownUtilizationThreshold: 0.5,
MaxNodeProvisionTime: 15 * time.Minute,
IgnoreDaemonSetsUtilization: true,
}
ngOpts := &config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: 10 * time.Minute,
@ -56,6 +57,7 @@ func TestDelegatingNodeGroupConfigProcessor(t *testing.T) {
ScaleDownGpuUtilizationThreshold: 0.85,
ScaleDownUtilizationThreshold: 0.75,
MaxNodeProvisionTime: 60 * time.Minute,
IgnoreDaemonSetsUtilization: false,
}
testUnneededTime := func(t *testing.T, p DelegatingNodeGroupConfigProcessor, c *context.AutoscalingContext, ng cloudprovider.NodeGroup, w Want, we error) {
@ -109,18 +111,32 @@ func TestDelegatingNodeGroupConfigProcessor(t *testing.T) {
assert.Equal(t, res, results[w])
}
// for IgnoreDaemonSetsUtilization
testIgnoreDSUtilization := func(t *testing.T, p DelegatingNodeGroupConfigProcessor, c *context.AutoscalingContext, ng cloudprovider.NodeGroup, w Want, we error) {
res, err := p.GetIgnoreDaemonSetsUtilization(c, ng)
assert.Equal(t, err, we)
results := map[Want]bool{
NIL: false,
GLOBAL: true,
NG: false,
}
assert.Equal(t, res, results[w])
}
funcs := map[string]func(*testing.T, DelegatingNodeGroupConfigProcessor, *context.AutoscalingContext, cloudprovider.NodeGroup, Want, error){
"ScaleDownUnneededTime": testUnneededTime,
"ScaleDownUnreadyTime": testUnreadyTime,
"ScaleDownUtilizationThreshold": testUtilizationThreshold,
"ScaleDownGpuUtilizationThreshold": testGpuThreshold,
"MaxNodeProvisionTime": testMaxNodeProvisionTime,
"IgnoreDaemonSetsUtilization": testIgnoreDSUtilization,
"MultipleOptions": func(t *testing.T, p DelegatingNodeGroupConfigProcessor, c *context.AutoscalingContext, ng cloudprovider.NodeGroup, w Want, we error) {
testUnneededTime(t, p, c, ng, w, we)
testUnreadyTime(t, p, c, ng, w, we)
testUtilizationThreshold(t, p, c, ng, w, we)
testGpuThreshold(t, p, c, ng, w, we)
testMaxNodeProvisionTime(t, p, c, ng, w, we)
testIgnoreDSUtilization(t, p, c, ng, w, we)
},
"RepeatingTheSameCallGivesConsistentResults": func(t *testing.T, p DelegatingNodeGroupConfigProcessor, c *context.AutoscalingContext, ng cloudprovider.NodeGroup, w Want, we error) {
testUnneededTime(t, p, c, ng, w, we)
@ -128,6 +144,9 @@ func TestDelegatingNodeGroupConfigProcessor(t *testing.T) {
// throw in a different call
testGpuThreshold(t, p, c, ng, w, we)
testUnneededTime(t, p, c, ng, w, we)
// throw in another different call
testIgnoreDSUtilization(t, p, c, ng, w, we)
testUnneededTime(t, p, c, ng, w, we)
},
}

View File

@ -67,6 +67,15 @@ func BuildTestPod(name string, cpu int64, mem int64) *apiv1.Pod {
return pod
}
// BuildDSTestPod creates a DaemonSet pod with cpu and memory.
func BuildDSTestPod(name string, cpu int64, mem int64) *apiv1.Pod {
pod := BuildTestPod(name, cpu, mem)
pod.OwnerReferences = GenerateOwnerReferences("ds", "DaemonSet", "apps/v1", "some-uid")
return pod
}
// BuildTestPodWithEphemeralStorage creates a pod with cpu, memory and ephemeral storage resources.
func BuildTestPodWithEphemeralStorage(name string, cpu, mem, ephemeralStorage int64) *apiv1.Pod {
startTime := metav1.Unix(0, 0)