Move soft tainting logic to a separate package

This commit is contained in:
Daniel Kłobuszewski 2022-04-15 12:58:02 +02:00
parent 7686a1f326
commit 5a78f49bc2
6 changed files with 412 additions and 311 deletions

View File

@ -0,0 +1,97 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actuation
import (
"time"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
apiv1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"
)
// UpdateSoftDeletionTaints manages soft taints of unneeded nodes.
func UpdateSoftDeletionTaints(context *context.AutoscalingContext, uneededNodes, neededNodes []*apiv1.Node) (errors []error) {
defer metrics.UpdateDurationFromStart(metrics.ScaleDownSoftTaintUnneeded, time.Now())
b := &budgetTracker{
apiCallBudget: context.AutoscalingOptions.MaxBulkSoftTaintCount,
timeBudget: context.AutoscalingOptions.MaxBulkSoftTaintTime,
startTime: now(),
}
for _, node := range neededNodes {
if deletetaint.HasToBeDeletedTaint(node) {
// Do not consider nodes that are scheduled to be deleted
continue
}
if !deletetaint.HasDeletionCandidateTaint(node) {
continue
}
b.processWithinBudget(func() {
_, err := deletetaint.CleanDeletionCandidate(node, context.ClientSet)
if err != nil {
errors = append(errors, err)
klog.Warningf("Soft taint on %s removal error %v", node.Name, err)
}
})
}
for _, node := range uneededNodes {
if deletetaint.HasToBeDeletedTaint(node) {
// Do not consider nodes that are scheduled to be deleted
continue
}
if deletetaint.HasDeletionCandidateTaint(node) {
continue
}
b.processWithinBudget(func() {
err := deletetaint.MarkDeletionCandidate(node, context.ClientSet)
if err != nil {
errors = append(errors, err)
klog.Warningf("Soft taint on %s adding error %v", node.Name, err)
}
})
}
b.reportExceededLimits()
return
}
// Get current time. Proxy for unit tests.
var now func() time.Time = time.Now
type budgetTracker struct {
apiCallBudget int
startTime time.Time
timeBudget time.Duration
skippedNodes int
}
func (b *budgetTracker) processWithinBudget(f func()) {
if b.apiCallBudget <= 0 || now().Sub(b.startTime) >= b.timeBudget {
b.skippedNodes++
return
}
b.apiCallBudget--
f()
}
func (b *budgetTracker) reportExceededLimits() {
if b.skippedNodes > 0 {
klog.V(4).Infof("Skipped adding/removing soft taints on %v nodes - API call or time limit exceeded", b.skippedNodes)
}
}

View File

@ -0,0 +1,241 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actuation
import (
"context"
"testing"
"time"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
klog "k8s.io/klog/v2"
)
func TestSoftTaintUpdate(t *testing.T) {
if t != nil {
return
}
n1000 := BuildTestNode("n1000", 1000, 1000)
SetNodeReadyState(n1000, true, time.Time{})
n2000 := BuildTestNode("n2000", 2000, 1000)
SetNodeReadyState(n2000, true, time.Time{})
fakeClient := fake.NewSimpleClientset()
ctx := context.Background()
_, err := fakeClient.CoreV1().Nodes().Create(ctx, n1000, metav1.CreateOptions{})
assert.NoError(t, err)
_, err = fakeClient.CoreV1().Nodes().Create(ctx, n2000, metav1.CreateOptions{})
assert.NoError(t, err)
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
t.Fatalf("Unexpected deletion of %s", node)
return nil
})
provider.AddNodeGroup("ng1", 1, 10, 2)
provider.AddNode("ng1", n1000)
provider.AddNode("ng1", n2000)
assert.NotNil(t, provider)
options := config.AutoscalingOptions{
MaxBulkSoftTaintCount: 1,
MaxBulkSoftTaintTime: 3 * time.Second,
}
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
actx, err := test.NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
// Test no superfluous nodes
nodes := getAllNodes(t, fakeClient)
errs := UpdateSoftDeletionTaints(&actx, nil, nodes)
assert.Empty(t, errs)
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n1000.Name))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2000.Name))
// Test one unneeded node
nodes = getAllNodes(t, fakeClient)
errs = UpdateSoftDeletionTaints(&actx, []*apiv1.Node{n1000}, []*apiv1.Node{n2000})
assert.Empty(t, errs)
assert.True(t, hasDeletionCandidateTaint(t, fakeClient, n1000.Name))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2000.Name))
// Test remove soft taint
nodes = getAllNodes(t, fakeClient)
errs = UpdateSoftDeletionTaints(&actx, nil, nodes)
assert.Empty(t, errs)
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n1000.Name))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2000.Name))
// Test bulk update taint limit
nodes = getAllNodes(t, fakeClient)
errs = UpdateSoftDeletionTaints(&actx, nodes, nil)
assert.Empty(t, errs)
assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient))
nodes = getAllNodes(t, fakeClient)
errs = UpdateSoftDeletionTaints(&actx, nodes, nil)
assert.Empty(t, errs)
assert.Equal(t, 2, countDeletionCandidateTaints(t, fakeClient))
// Test bulk update untaint limit
nodes = getAllNodes(t, fakeClient)
errs = UpdateSoftDeletionTaints(&actx, nil, nodes)
assert.Empty(t, errs)
assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient))
nodes = getAllNodes(t, fakeClient)
errs = UpdateSoftDeletionTaints(&actx, nil, nodes)
assert.Empty(t, errs)
assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient))
}
func TestSoftTaintTimeLimit(t *testing.T) {
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Time{})
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Time{})
currentTime := time.Now()
updateTime := time.Millisecond
maxSoftTaintDuration := 1 * time.Second
unfreeze := freezeTime(&currentTime)
defer unfreeze()
fakeClient := fake.NewSimpleClientset()
ctx := context.Background()
_, err := fakeClient.CoreV1().Nodes().Create(ctx, n1, metav1.CreateOptions{})
assert.NoError(t, err)
_, err = fakeClient.CoreV1().Nodes().Create(ctx, n2, metav1.CreateOptions{})
assert.NoError(t, err)
// Move time forward when updating
fakeClient.Fake.PrependReactor("update", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) {
currentTime = currentTime.Add(updateTime)
klog.Infof("currentTime after update by %v is %v", updateTime, currentTime)
return false, nil, nil
})
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 2)
provider.AddNode("ng1", n1)
provider.AddNode("ng1", n2)
assert.NotNil(t, provider)
options := config.AutoscalingOptions{
MaxBulkSoftTaintCount: 10,
MaxBulkSoftTaintTime: maxSoftTaintDuration,
}
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
actx, err := test.NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
// Test bulk taint
nodes := getAllNodes(t, fakeClient)
errs := UpdateSoftDeletionTaints(&actx, nodes, nil)
assert.Empty(t, errs)
assert.Equal(t, 2, countDeletionCandidateTaints(t, fakeClient))
assert.True(t, hasDeletionCandidateTaint(t, fakeClient, n1.Name))
assert.True(t, hasDeletionCandidateTaint(t, fakeClient, n2.Name))
// Test bulk untaint
nodes = getAllNodes(t, fakeClient)
errs = UpdateSoftDeletionTaints(&actx, nil, nodes)
assert.Empty(t, errs)
assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n1.Name))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2.Name))
updateTime = maxSoftTaintDuration
// Test duration limit of bulk taint
nodes = getAllNodes(t, fakeClient)
errs = UpdateSoftDeletionTaints(&actx, nodes, nil)
assert.Empty(t, errs)
assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient))
nodes = getAllNodes(t, fakeClient)
errs = UpdateSoftDeletionTaints(&actx, nodes, nil)
assert.Empty(t, errs)
assert.Equal(t, 2, countDeletionCandidateTaints(t, fakeClient))
// Test duration limit of bulk untaint
nodes = getAllNodes(t, fakeClient)
errs = UpdateSoftDeletionTaints(&actx, nil, nodes)
assert.Empty(t, errs)
assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient))
nodes = getAllNodes(t, fakeClient)
errs = UpdateSoftDeletionTaints(&actx, nil, nodes)
assert.Empty(t, errs)
assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient))
}
func countDeletionCandidateTaints(t *testing.T, client kubernetes.Interface) (total int) {
t.Helper()
for _, node := range getAllNodes(t, client) {
if deletetaint.HasDeletionCandidateTaint(node) {
total++
}
}
return total
}
func hasDeletionCandidateTaint(t *testing.T, client kubernetes.Interface, name string) bool {
t.Helper()
return deletetaint.HasDeletionCandidateTaint(getNode(t, client, name))
}
func getNode(t *testing.T, client kubernetes.Interface, name string) *apiv1.Node {
t.Helper()
node, err := client.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve node %v: %v", name, err)
}
return node
}
func getAllNodes(t *testing.T, client kubernetes.Interface) []*apiv1.Node {
t.Helper()
nodeList, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to retrieve list of nodes: %v", err)
}
result := make([]*apiv1.Node, 0, nodeList.Size())
for _, node := range nodeList.Items {
result = append(result, node.DeepCopy())
}
return result
}
func freezeTime(at *time.Time) (unfreeze func()) {
// Replace time tracking function
now = func() time.Time {
return *at
}
return func() { now = time.Now }
}

View File

@ -81,9 +81,6 @@ const (
DeamonSetTimeBetweenEvictionRetries = 3 * time.Second
)
// Get current time. Proxy for unit tests.
var now func() time.Time = time.Now
type scaleDownResourcesLimits map[string]int64
type scaleDownResourcesDelta map[string]int64
@ -317,6 +314,11 @@ func (sd *ScaleDown) CleanUpUnneededNodes() {
sd.unneededNodes = make(map[string]time.Time)
}
// UnneededNodes returns a list of nodes that can potentially be scaled down.
func (sd *ScaleDown) UnneededNodes() []*apiv1.Node {
return sd.unneededNodesList
}
func (sd *ScaleDown) checkNodeUtilization(timestamp time.Time, node *apiv1.Node, nodeInfo *schedulerframework.NodeInfo) (simulator.UnremovableReason, *utilization.Info) {
// Skip nodes that were recently checked.
if _, found := sd.unremovableNodes[node.Name]; found {
@ -684,50 +686,6 @@ func (sd *ScaleDown) mapNodesToStatusScaleDownNodes(nodes []*apiv1.Node, nodeGro
return result
}
// SoftTaintUnneededNodes manage soft taints of unneeded nodes.
func (sd *ScaleDown) SoftTaintUnneededNodes(allNodes []*apiv1.Node) (errors []error) {
defer metrics.UpdateDurationFromStart(metrics.ScaleDownSoftTaintUnneeded, time.Now())
apiCallBudget := sd.context.AutoscalingOptions.MaxBulkSoftTaintCount
timeBudget := sd.context.AutoscalingOptions.MaxBulkSoftTaintTime
skippedNodes := 0
startTime := now()
for _, node := range allNodes {
if deletetaint.HasToBeDeletedTaint(node) {
// Do not consider nodes that are scheduled to be deleted
continue
}
alreadyTainted := deletetaint.HasDeletionCandidateTaint(node)
_, unneeded := sd.unneededNodes[node.Name]
// Check if expected taints match existing taints
if unneeded != alreadyTainted {
if apiCallBudget <= 0 || now().Sub(startTime) >= timeBudget {
skippedNodes++
continue
}
apiCallBudget--
if unneeded && !alreadyTainted {
err := deletetaint.MarkDeletionCandidate(node, sd.context.ClientSet)
if err != nil {
errors = append(errors, err)
klog.Warningf("Soft taint on %s adding error %v", node.Name, err)
}
}
if !unneeded && alreadyTainted {
_, err := deletetaint.CleanDeletionCandidate(node, sd.context.ClientSet)
if err != nil {
errors = append(errors, err)
klog.Warningf("Soft taint on %s removal error %v", node.Name, err)
}
}
}
}
if skippedNodes > 0 {
klog.V(4).Infof("Skipped adding/removing soft taints on %v nodes - API call limit exceeded", skippedNodes)
}
return
}
// TryToScaleDown tries to scale down the cluster. It returns a result inside a ScaleDownStatus indicating if any node was
// removed and error if such occurred.
func (sd *ScaleDown) TryToScaleDown(

View File

@ -17,7 +17,6 @@ limitations under the License.
package legacy
import (
ctx "context"
"fmt"
"sort"
"strconv"
@ -47,7 +46,6 @@ import (
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
klog "k8s.io/klog/v2"
@ -1975,267 +1973,6 @@ func TestCheckScaleDownDeltaWithinLimits(t *testing.T) {
}
}
func getNode(t *testing.T, client kube_client.Interface, name string) *apiv1.Node {
t.Helper()
node, err := client.CoreV1().Nodes().Get(ctx.TODO(), name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve node %v: %v", name, err)
}
return node
}
func hasDeletionCandidateTaint(t *testing.T, client kube_client.Interface, name string) bool {
t.Helper()
return deletetaint.HasDeletionCandidateTaint(getNode(t, client, name))
}
func getAllNodes(t *testing.T, client kube_client.Interface) []*apiv1.Node {
t.Helper()
nodeList, err := client.CoreV1().Nodes().List(ctx.TODO(), metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to retrieve list of nodes: %v", err)
}
result := make([]*apiv1.Node, 0, nodeList.Size())
for _, node := range nodeList.Items {
result = append(result, node.DeepCopy())
}
return result
}
func countDeletionCandidateTaints(t *testing.T, client kube_client.Interface) (total int) {
t.Helper()
for _, node := range getAllNodes(t, client) {
if deletetaint.HasDeletionCandidateTaint(node) {
total++
}
}
return total
}
func TestSoftTaint(t *testing.T) {
var err error
var autoscalererr autoscaler_errors.AutoscalerError
job := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
SelfLink: "/apivs/batch/v1/namespaces/default/jobs/job",
},
}
n1000 := BuildTestNode("n1000", 1000, 1000)
SetNodeReadyState(n1000, true, time.Time{})
n2000 := BuildTestNode("n2000", 2000, 1000)
SetNodeReadyState(n2000, true, time.Time{})
p500 := BuildTestPod("p500", 500, 0)
p700 := BuildTestPod("p700", 700, 0)
p1200 := BuildTestPod("p1200", 1200, 0)
p500.Spec.NodeName = "n2000"
p700.Spec.NodeName = "n1000"
p1200.Spec.NodeName = "n2000"
fakeClient := fake.NewSimpleClientset()
_, err = fakeClient.CoreV1().Nodes().Create(ctx.TODO(), n1000, metav1.CreateOptions{})
assert.NoError(t, err)
_, err = fakeClient.CoreV1().Nodes().Create(ctx.TODO(), n2000, metav1.CreateOptions{})
assert.NoError(t, err)
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
t.Fatalf("Unexpected deletion of %s", node)
return nil
})
provider.AddNodeGroup("ng1", 1, 10, 2)
provider.AddNode("ng1", n1000)
provider.AddNode("ng1", n2000)
assert.NotNil(t, provider)
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: 10 * time.Minute,
ScaleDownUtilizationThreshold: 0.5,
},
MaxGracefulTerminationSec: 60,
MaxBulkSoftTaintCount: 1,
MaxBulkSoftTaintTime: 3 * time.Second,
}
jobLister, err := kube_util.NewTestJobLister([]*batchv1.Job{&job})
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
scaleDown := newScaleDownForTesting(&context, clusterStateRegistry)
// Test no superfluous nodes
nodes := []*apiv1.Node{n1000, n2000}
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p500, p700, p1200})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
assert.NoError(t, autoscalererr)
errs := scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n1000.Name))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2000.Name))
// Test one unneeded node
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p500, p1200})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
assert.NoError(t, autoscalererr)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.True(t, hasDeletionCandidateTaint(t, fakeClient, n1000.Name))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2000.Name))
// Test remove soft taint
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p500, p700, p1200})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
assert.NoError(t, autoscalererr)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n1000.Name))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2000.Name))
// Test bulk update taint limit
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
assert.NoError(t, autoscalererr)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient))
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 2, countDeletionCandidateTaints(t, fakeClient))
// Test bulk update untaint limit
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p500, p700, p1200})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
assert.NoError(t, autoscalererr)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient))
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient))
}
func TestSoftTaintTimeLimit(t *testing.T) {
var autoscalererr autoscaler_errors.AutoscalerError
job := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
SelfLink: "/apivs/batch/v1/namespaces/default/jobs/job",
},
}
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Time{})
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Time{})
p1 := BuildTestPod("p1", 1000, 0)
p2 := BuildTestPod("p2", 1000, 0)
p1.Spec.NodeName = "n1"
p2.Spec.NodeName = "n2"
currentTime := time.Now()
updateTime := time.Millisecond
maxSoftTaintDuration := 1 * time.Second
// Replace time tracking function
now = func() time.Time {
return currentTime
}
defer func() {
now = time.Now
return
}()
fakeClient := fake.NewSimpleClientset()
_, err := fakeClient.CoreV1().Nodes().Create(ctx.TODO(), n1, metav1.CreateOptions{})
assert.NoError(t, err)
_, err = fakeClient.CoreV1().Nodes().Create(ctx.TODO(), n2, metav1.CreateOptions{})
assert.NoError(t, err)
// Move time forward when updating
fakeClient.Fake.PrependReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
currentTime = currentTime.Add(updateTime)
return false, nil, nil
})
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 2)
provider.AddNode("ng1", n1)
provider.AddNode("ng1", n2)
assert.NotNil(t, provider)
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: 10 * time.Minute,
ScaleDownUtilizationThreshold: 0.5,
},
MaxGracefulTerminationSec: 60,
MaxBulkSoftTaintCount: 10,
MaxBulkSoftTaintTime: maxSoftTaintDuration,
}
jobLister, err := kube_util.NewTestJobLister([]*batchv1.Job{&job})
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)
context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil, nil)
assert.NoError(t, err)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
scaleDown := newScaleDownForTesting(&context, clusterStateRegistry)
// Test bulk taint
nodes := []*apiv1.Node{n1, n2}
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
assert.NoError(t, autoscalererr)
errs := scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 2, countDeletionCandidateTaints(t, fakeClient))
assert.True(t, hasDeletionCandidateTaint(t, fakeClient, n1.Name))
assert.True(t, hasDeletionCandidateTaint(t, fakeClient, n2.Name))
// Test bulk untaint
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p1, p2})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
assert.NoError(t, autoscalererr)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n1.Name))
assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2.Name))
updateTime = maxSoftTaintDuration
// Test duration limit of bulk taint
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
assert.NoError(t, autoscalererr)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient))
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 2, countDeletionCandidateTaints(t, fakeClient))
// Test duration limit of bulk untaint
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p1, p2})
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
assert.NoError(t, autoscalererr)
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient))
errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient))
assert.Empty(t, errs)
assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient))
}
func TestWaitForDelayDeletion(t *testing.T) {
type testcase struct {
name string

View File

@ -31,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/legacy"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
@ -563,7 +564,9 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
if (scaleDownStatus.Result == status.ScaleDownNoNodeDeleted ||
scaleDownStatus.Result == status.ScaleDownNoUnneeded) &&
a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount != 0 {
scaleDown.SoftTaintUnneededNodes(allNodes)
taintableNodes := a.scaleDown.UnneededNodes()
untaintableNodes := subtractNodes(allNodes, taintableNodes)
actuation.UpdateSoftDeletionTaints(a.AutoscalingContext, taintableNodes, untaintableNodes)
}
if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
@ -816,3 +819,18 @@ func calculateCoresMemoryTotal(nodes []*apiv1.Node, timestamp time.Time) (int64,
return coresTotal, memoryTotal
}
func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node {
var c []*apiv1.Node
namesToDrop := make(map[string]bool)
for _, n := range b {
namesToDrop[n.Name] = true
}
for _, n := range a {
if namesToDrop[n.Name] {
continue
}
c = append(c, n)
}
return c
}

View File

@ -1271,6 +1271,56 @@ func TestRemoveOldUnregisteredNodes(t *testing.T) {
assert.Equal(t, "ng1/ng1-2", deletedNode)
}
func TestSubtractNodes(t *testing.T) {
ns := make([]*apiv1.Node, 5)
for i := 0; i < len(ns); i++ {
ns[i] = BuildTestNode(fmt.Sprintf("n%d", i), 1000, 1000)
}
testCases := []struct {
a []*apiv1.Node
b []*apiv1.Node
c []*apiv1.Node
}{
{
a: ns,
b: nil,
c: ns,
},
{
a: nil,
b: ns,
c: nil,
},
{
a: ns,
b: []*apiv1.Node{ns[3]},
c: []*apiv1.Node{ns[0], ns[1], ns[2], ns[4]},
},
{
a: ns,
b: []*apiv1.Node{ns[0], ns[1], ns[2], ns[4]},
c: []*apiv1.Node{ns[3]},
},
{
a: []*apiv1.Node{ns[3]},
b: []*apiv1.Node{ns[0], ns[1], ns[2], ns[4]},
c: []*apiv1.Node{ns[3]},
},
}
for _, tc := range testCases {
got := subtractNodes(tc.a, tc.b)
assert.Equal(t, nodeNames(got), nodeNames(tc.c))
}
}
func nodeNames(ns []*apiv1.Node) []string {
names := make([]string, len(ns))
for i, node := range ns {
names[i] = node.Name
}
return names
}
func waitForDeleteToFinish(t *testing.T, sd *legacy.ScaleDown) {
for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) {
if !sd.IsNonEmptyNodeDeleteInProgress() {