Merge pull request #6139 from damikag/priority-evictor
Implement priority based evictor
This commit is contained in:
commit
fc48d5c052
|
@ -19,6 +19,7 @@ package config
|
|||
import (
|
||||
"time"
|
||||
|
||||
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
scheduler_config "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
)
|
||||
|
||||
|
@ -125,7 +126,12 @@ type AutoscalingOptions struct {
|
|||
IgnoreMirrorPodsUtilization bool
|
||||
// MaxGracefulTerminationSec is maximum number of seconds scale down waits for pods to terminate before
|
||||
// removing the node from cloud provider.
|
||||
// DrainPriorityConfig takes higher precedence and MaxGracefulTerminationSec will not be applicable when the DrainPriorityConfig is set.
|
||||
MaxGracefulTerminationSec int
|
||||
// DrainPriorityConfig is a list of ShutdownGracePeriodByPodPriority.
|
||||
// This field is optional and could be nil.
|
||||
// DrainPriorityConfig takes higher precedence and MaxGracefulTerminationSec will not be applicable when the DrainPriorityConfig is set.
|
||||
DrainPriorityConfig []kubelet_config.ShutdownGracePeriodByPodPriority
|
||||
// MaxTotalUnreadyPercentage is the maximum percentage of unready nodes after which CA halts operations
|
||||
MaxTotalUnreadyPercentage float64
|
||||
// OkTotalUnreadyCount is the number of allowed unready nodes, irrespective of max-total-unready-percentage
|
||||
|
|
|
@ -21,8 +21,6 @@ import (
|
|||
"time"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
|
@ -41,6 +39,7 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// Actuator is responsible for draining and deleting nodes.
|
||||
|
@ -69,11 +68,18 @@ type actuatorNodeGroupConfigGetter interface {
|
|||
// NewActuator returns a new instance of Actuator.
|
||||
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
|
||||
ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval)
|
||||
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
|
||||
var evictor Evictor
|
||||
if len(ctx.DrainPriorityConfig) > 0 {
|
||||
evictor = NewEvictor(ndt, ctx.DrainPriorityConfig, true)
|
||||
} else {
|
||||
evictor = NewEvictor(ndt, legacyFlagDrainConfig, false)
|
||||
}
|
||||
return &Actuator{
|
||||
ctx: ctx,
|
||||
clusterState: csr,
|
||||
nodeDeletionTracker: ndt,
|
||||
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, drainabilityRules, ndt)),
|
||||
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor),
|
||||
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
|
||||
deleteOptions: deleteOptions,
|
||||
drainabilityRules: drainabilityRules,
|
||||
|
@ -318,7 +324,7 @@ func (a *Actuator) scaleDownNodeToReport(node *apiv1.Node, drain bool) (*status.
|
|||
}
|
||||
var evictedPods []*apiv1.Pod
|
||||
if drain {
|
||||
_, nonDsPodsToEvict := podsToEvict(a.ctx, nodeInfo)
|
||||
_, nonDsPodsToEvict := podsToEvict(nodeInfo, a.ctx.DaemonSetEvictionForOccupiedNodes)
|
||||
evictedPods = nonDsPodsToEvict
|
||||
}
|
||||
return &status.ScaleDownNode{
|
||||
|
|
|
@ -32,9 +32,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
|
@ -48,6 +45,8 @@ import (
|
|||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
|
||||
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
)
|
||||
|
||||
type nodeGroupViewInfo struct {
|
||||
|
@ -1190,7 +1189,8 @@ func TestStartDeletion(t *testing.T) {
|
|||
// Create Actuator, run StartDeletion, and verify the error.
|
||||
ndt := deletiontracker.NewNodeDeletionTracker(0)
|
||||
ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, 0*time.Second)
|
||||
evictor := Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
|
||||
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
|
||||
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, shutdownGracePeriodByPodPriority: legacyFlagDrainConfig, fullDsEviction: false}
|
||||
actuator := Actuator{
|
||||
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
|
||||
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
|
||||
|
@ -1426,7 +1426,8 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
|
|||
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
|
||||
ndt := deletiontracker.NewNodeDeletionTracker(0)
|
||||
ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, deleteInterval)
|
||||
evictor := Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
|
||||
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
|
||||
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, shutdownGracePeriodByPodPriority: legacyFlagDrainConfig}
|
||||
actuator := Actuator{
|
||||
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
|
||||
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
|
||||
|
|
|
@ -19,21 +19,19 @@ package actuation
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
policyv1beta1 "k8s.io/api/policy/v1beta1"
|
||||
kube_errors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
||||
"k8s.io/klog/v2"
|
||||
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
|
||||
acontext "k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
|
||||
|
@ -44,108 +42,104 @@ const (
|
|||
// DefaultEvictionRetryTime is the time after CA retries failed pod eviction.
|
||||
DefaultEvictionRetryTime = 10 * time.Second
|
||||
// DefaultPodEvictionHeadroom is the extra time we wait to catch situations when the pod is ignoring SIGTERM and
|
||||
// is killed with SIGKILL after MaxGracefulTerminationTime
|
||||
// is killed with SIGKILL after GracePeriodSeconds elapses
|
||||
DefaultPodEvictionHeadroom = 30 * time.Second
|
||||
// DefaultDsEvictionEmptyNodeTimeout is the time to evict all DaemonSet pods on empty node
|
||||
DefaultDsEvictionEmptyNodeTimeout = 10 * time.Second
|
||||
// DefaultDsEvictionRetryTime is a time between retries to create eviction that uses for DaemonSet eviction for empty nodes
|
||||
DefaultDsEvictionRetryTime = 3 * time.Second
|
||||
)
|
||||
|
||||
type evictionRegister interface {
|
||||
RegisterEviction(*apiv1.Pod)
|
||||
}
|
||||
|
||||
// Evictor can be used to evict pods from nodes.
|
||||
// Evictor keeps configurations of pod eviction
|
||||
type Evictor struct {
|
||||
EvictionRetryTime time.Duration
|
||||
DsEvictionRetryTime time.Duration
|
||||
DsEvictionEmptyNodeTimeout time.Duration
|
||||
PodEvictionHeadroom time.Duration
|
||||
evictionRegister evictionRegister
|
||||
deleteOptions options.NodeDeleteOptions
|
||||
drainabilityRules rules.Rules
|
||||
EvictionRetryTime time.Duration
|
||||
PodEvictionHeadroom time.Duration
|
||||
evictionRegister evictionRegister
|
||||
shutdownGracePeriodByPodPriority []kubelet_config.ShutdownGracePeriodByPodPriority
|
||||
fullDsEviction bool
|
||||
}
|
||||
|
||||
// NewDefaultEvictor returns an instance of Evictor using the default parameters.
|
||||
func NewDefaultEvictor(deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, evictionRegister evictionRegister) Evictor {
|
||||
// NewEvictor returns an instance of Evictor.
|
||||
func NewEvictor(evictionRegister evictionRegister, shutdownGracePeriodByPodPriority []kubelet_config.ShutdownGracePeriodByPodPriority, fullDsEviction bool) Evictor {
|
||||
sort.Slice(shutdownGracePeriodByPodPriority, func(i, j int) bool {
|
||||
return shutdownGracePeriodByPodPriority[i].Priority < shutdownGracePeriodByPodPriority[j].Priority
|
||||
})
|
||||
|
||||
return Evictor{
|
||||
EvictionRetryTime: DefaultEvictionRetryTime,
|
||||
DsEvictionRetryTime: DefaultDsEvictionRetryTime,
|
||||
DsEvictionEmptyNodeTimeout: DefaultDsEvictionEmptyNodeTimeout,
|
||||
PodEvictionHeadroom: DefaultPodEvictionHeadroom,
|
||||
evictionRegister: evictionRegister,
|
||||
deleteOptions: deleteOptions,
|
||||
drainabilityRules: drainabilityRules,
|
||||
EvictionRetryTime: DefaultEvictionRetryTime,
|
||||
PodEvictionHeadroom: DefaultPodEvictionHeadroom,
|
||||
evictionRegister: evictionRegister,
|
||||
shutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority,
|
||||
fullDsEviction: fullDsEviction,
|
||||
}
|
||||
}
|
||||
|
||||
// DrainNode works like DrainNodeWithPods, but lists of pods to evict don't have to be provided. All non-mirror, non-DS pods on the
|
||||
// node are evicted. Mirror pods are not evicted. DaemonSet pods are evicted if DaemonSetEvictionForOccupiedNodes is enabled, or
|
||||
// if they have the EnableDsEvictionKey annotation.
|
||||
// DrainNode groups pods in the node in to priority groups and, evicts pods in the ascending order of priorities.
|
||||
// If priority evictor is not enable, eviction of daemonSet pods is the best effort.
|
||||
func (e Evictor) DrainNode(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo) (map[string]status.PodEvictionResult, error) {
|
||||
dsPodsToEvict, nonDsPodsToEvict := podsToEvict(ctx, nodeInfo)
|
||||
return e.DrainNodeWithPods(ctx, nodeInfo.Node(), nonDsPodsToEvict, dsPodsToEvict)
|
||||
node := nodeInfo.Node()
|
||||
dsPods, pods := podsToEvict(nodeInfo, ctx.DaemonSetEvictionForOccupiedNodes)
|
||||
if e.fullDsEviction {
|
||||
return e.drainNodeWithPodsBasedOnPodPriority(ctx, node, append(pods, dsPods...), nil)
|
||||
}
|
||||
return e.drainNodeWithPodsBasedOnPodPriority(ctx, node, pods, dsPods)
|
||||
}
|
||||
|
||||
// DrainNodeWithPods performs drain logic on the node. Marks the node as unschedulable and later removes all pods, giving
|
||||
// them up to MaxGracefulTerminationTime to finish. The list of pods to evict has to be provided.
|
||||
func (e Evictor) DrainNodeWithPods(ctx *acontext.AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod) (map[string]status.PodEvictionResult, error) {
|
||||
// EvictDaemonSetPods groups daemonSet pods in the node in to priority groups and, evicts daemonSet pods in the ascending order of priorities.
|
||||
// If priority evictor is not enable, eviction of daemonSet pods is the best effort.
|
||||
func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo) (map[string]status.PodEvictionResult, error) {
|
||||
node := nodeInfo.Node()
|
||||
dsPods, _ := podsToEvict(nodeInfo, ctx.DaemonSetEvictionForEmptyNodes)
|
||||
if e.fullDsEviction {
|
||||
return e.drainNodeWithPodsBasedOnPodPriority(ctx, node, dsPods, nil)
|
||||
}
|
||||
return e.drainNodeWithPodsBasedOnPodPriority(ctx, node, nil, dsPods)
|
||||
}
|
||||
|
||||
// drainNodeWithPodsBasedOnPodPriority performs drain logic on the node based on pod priorities.
|
||||
// Removes all pods, giving each pod group up to ShutdownGracePeriodSeconds to finish. The list of pods to evict has to be provided.
|
||||
func (e Evictor) drainNodeWithPodsBasedOnPodPriority(ctx *acontext.AutoscalingContext, node *apiv1.Node, fullEvictionPods, bestEffortEvictionPods []*apiv1.Pod) (map[string]status.PodEvictionResult, error) {
|
||||
evictionResults := make(map[string]status.PodEvictionResult)
|
||||
retryUntil := time.Now().Add(ctx.MaxPodEvictionTime)
|
||||
confirmations := make(chan status.PodEvictionResult, len(pods))
|
||||
daemonSetConfirmations := make(chan status.PodEvictionResult, len(daemonSetPods))
|
||||
for _, pod := range pods {
|
||||
evictionResults[pod.Name] = status.PodEvictionResult{Pod: pod, TimedOut: true, Err: nil}
|
||||
go func(podToEvict *apiv1.Pod) {
|
||||
confirmations <- evictPod(ctx, podToEvict, false, retryUntil, e.EvictionRetryTime, e.evictionRegister)
|
||||
}(pod)
|
||||
}
|
||||
|
||||
// Perform eviction of daemonset. We don't want to raise an error if daemonsetPod wasn't evict properly
|
||||
for _, daemonSetPod := range daemonSetPods {
|
||||
go func(podToEvict *apiv1.Pod) {
|
||||
daemonSetConfirmations <- evictPod(ctx, podToEvict, true, retryUntil, e.EvictionRetryTime, e.evictionRegister)
|
||||
}(daemonSetPod)
|
||||
|
||||
}
|
||||
|
||||
podsEvictionCounter := 0
|
||||
for i := 0; i < len(pods)+len(daemonSetPods); i++ {
|
||||
select {
|
||||
case evictionResult := <-confirmations:
|
||||
podsEvictionCounter++
|
||||
evictionResults[evictionResult.Pod.Name] = evictionResult
|
||||
if evictionResult.WasEvictionSuccessful() {
|
||||
metrics.RegisterEvictions(1)
|
||||
}
|
||||
case <-daemonSetConfirmations:
|
||||
case <-time.After(retryUntil.Sub(time.Now()) + 5*time.Second):
|
||||
if podsEvictionCounter < len(pods) {
|
||||
// All pods initially had results with TimedOut set to true, so the ones that didn't receive an actual result are correctly marked as timed out.
|
||||
return evictionResults, errors.NewAutoscalerError(errors.ApiCallError, "Failed to drain node %s/%s: timeout when waiting for creating evictions", node.Namespace, node.Name)
|
||||
}
|
||||
klog.Infof("Timeout when waiting for creating daemonSetPods eviction")
|
||||
groups := groupByPriority(e.shutdownGracePeriodByPodPriority, fullEvictionPods, bestEffortEvictionPods)
|
||||
for _, group := range groups {
|
||||
for _, pod := range group.FullEvictionPods {
|
||||
evictionResults[pod.Name] = status.PodEvictionResult{Pod: pod, TimedOut: false,
|
||||
Err: errors.NewAutoscalerError(errors.UnexpectedScaleDownStateError, "Eviction did not attempted for the pod %s because some of the previous evictions failed", pod.Name)}
|
||||
}
|
||||
}
|
||||
|
||||
evictionErrs := make([]error, 0)
|
||||
for _, result := range evictionResults {
|
||||
if !result.WasEvictionSuccessful() {
|
||||
evictionErrs = append(evictionErrs, result.Err)
|
||||
for _, group := range groups {
|
||||
// If there are no pods in a particular range,
|
||||
// then do not wait for pods in that priority range.
|
||||
if len(group.FullEvictionPods) == 0 && len(group.BestEffortEvictionPods) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var err error
|
||||
evictionResults, err = e.initiateEviction(ctx, node, group.FullEvictionPods, group.BestEffortEvictionPods, evictionResults, group.ShutdownGracePeriodSeconds)
|
||||
if err != nil {
|
||||
return evictionResults, err
|
||||
}
|
||||
|
||||
// Evictions created successfully, wait ShutdownGracePeriodSeconds + podEvictionHeadroom to see if fullEviction pods really disappeared.
|
||||
evictionResults, err = e.waitPodsToDisappear(ctx, node, group.FullEvictionPods, evictionResults, group.ShutdownGracePeriodSeconds)
|
||||
if err != nil {
|
||||
return evictionResults, err
|
||||
}
|
||||
}
|
||||
if len(evictionErrs) != 0 {
|
||||
return evictionResults, errors.NewAutoscalerError(errors.ApiCallError, "Failed to drain node %s/%s, due to following errors: %v", node.Namespace, node.Name, evictionErrs)
|
||||
}
|
||||
klog.V(1).Infof("All pods removed from %s", node.Name)
|
||||
return evictionResults, nil
|
||||
}
|
||||
|
||||
// Evictions created successfully, wait maxGracefulTerminationSec + podEvictionHeadroom to see if pods really disappeared.
|
||||
func (e Evictor) waitPodsToDisappear(ctx *acontext.AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod, evictionResults map[string]status.PodEvictionResult,
|
||||
maxTermination int64) (map[string]status.PodEvictionResult, error) {
|
||||
var allGone bool
|
||||
for start := time.Now(); time.Now().Sub(start) < time.Duration(ctx.MaxGracefulTerminationSec)*time.Second+e.PodEvictionHeadroom; time.Sleep(5 * time.Second) {
|
||||
for start := time.Now(); time.Now().Sub(start) < time.Duration(maxTermination)*time.Second+e.PodEvictionHeadroom; time.Sleep(5 * time.Second) {
|
||||
allGone = true
|
||||
for _, pod := range pods {
|
||||
podreturned, err := ctx.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
|
||||
if err == nil && (podreturned == nil || podreturned.Spec.NodeName == node.Name) {
|
||||
podReturned, err := ctx.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
|
||||
if err == nil && (podReturned == nil || podReturned.Spec.NodeName == node.Name) {
|
||||
klog.V(1).Infof("Not deleted yet %s/%s", pod.Namespace, pod.Name)
|
||||
allGone = false
|
||||
break
|
||||
|
@ -157,8 +151,6 @@ func (e Evictor) DrainNodeWithPods(ctx *acontext.AutoscalingContext, node *apiv1
|
|||
}
|
||||
}
|
||||
if allGone {
|
||||
klog.V(1).Infof("All pods removed from %s", node.Name)
|
||||
// Let the deferred function know there is no need for cleanup
|
||||
return evictionResults, nil
|
||||
}
|
||||
}
|
||||
|
@ -177,58 +169,65 @@ func (e Evictor) DrainNodeWithPods(ctx *acontext.AutoscalingContext, node *apiv1
|
|||
return evictionResults, errors.NewAutoscalerError(errors.TransientError, "Failed to drain node %s/%s: pods remaining after timeout", node.Namespace, node.Name)
|
||||
}
|
||||
|
||||
// EvictDaemonSetPods creates eviction objects for all DaemonSet pods on the node.
|
||||
func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo, timeNow time.Time) error {
|
||||
nodeToDelete := nodeInfo.Node()
|
||||
_, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, e.drainabilityRules, nil, nil, timeNow)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err)
|
||||
func (e Evictor) initiateEviction(ctx *acontext.AutoscalingContext, node *apiv1.Node, fullEvictionPods, bestEffortEvictionPods []*apiv1.Pod, evictionResults map[string]status.PodEvictionResult,
|
||||
maxTermination int64) (map[string]status.PodEvictionResult, error) {
|
||||
|
||||
retryUntil := time.Now().Add(ctx.MaxPodEvictionTime)
|
||||
fullEvictionConfirmations := make(chan status.PodEvictionResult, len(fullEvictionPods))
|
||||
bestEffortEvictionConfirmations := make(chan status.PodEvictionResult, len(bestEffortEvictionPods))
|
||||
|
||||
for _, pod := range fullEvictionPods {
|
||||
evictionResults[pod.Name] = status.PodEvictionResult{Pod: pod, TimedOut: true, Err: nil}
|
||||
go func(pod *apiv1.Pod) {
|
||||
fullEvictionConfirmations <- e.evictPod(ctx, pod, retryUntil, maxTermination, true)
|
||||
}(pod)
|
||||
}
|
||||
|
||||
daemonSetPods = daemonset.PodsToEvict(daemonSetPods, ctx.DaemonSetEvictionForEmptyNodes)
|
||||
|
||||
dsEviction := make(chan status.PodEvictionResult, len(daemonSetPods))
|
||||
|
||||
// Perform eviction of DaemonSet pods
|
||||
for _, daemonSetPod := range daemonSetPods {
|
||||
go func(podToEvict *apiv1.Pod) {
|
||||
dsEviction <- evictPod(ctx, podToEvict, true, timeNow.Add(e.DsEvictionEmptyNodeTimeout), e.DsEvictionRetryTime, e.evictionRegister)
|
||||
}(daemonSetPod)
|
||||
for _, pod := range bestEffortEvictionPods {
|
||||
go func(pod *apiv1.Pod) {
|
||||
bestEffortEvictionConfirmations <- e.evictPod(ctx, pod, retryUntil, maxTermination, false)
|
||||
}(pod)
|
||||
}
|
||||
// Wait for creating eviction of DaemonSet pods
|
||||
var failedPodErrors []string
|
||||
for range daemonSetPods {
|
||||
|
||||
for i := 0; i < len(fullEvictionPods)+len(bestEffortEvictionPods); i++ {
|
||||
select {
|
||||
case status := <-dsEviction:
|
||||
if status.Err != nil {
|
||||
failedPodErrors = append(failedPodErrors, status.Err.Error())
|
||||
case evictionResult := <-fullEvictionConfirmations:
|
||||
evictionResults[evictionResult.Pod.Name] = evictionResult
|
||||
if evictionResult.WasEvictionSuccessful() {
|
||||
metrics.RegisterEvictions(1, metrics.PodEvictionSucceed)
|
||||
} else {
|
||||
metrics.RegisterEvictions(1, metrics.PodEvictionFailed)
|
||||
}
|
||||
// adding waitBetweenRetries in order to have a bigger time interval than evictPod()
|
||||
case <-time.After(e.DsEvictionEmptyNodeTimeout):
|
||||
return fmt.Errorf("failed to create DaemonSet eviction for %v seconds on the %s", e.DsEvictionEmptyNodeTimeout, nodeToDelete.Name)
|
||||
case <-bestEffortEvictionConfirmations:
|
||||
}
|
||||
}
|
||||
if len(failedPodErrors) > 0 {
|
||||
|
||||
return fmt.Errorf("following DaemonSet pod failed to evict on the %s:\n%s", nodeToDelete.Name, fmt.Errorf(strings.Join(failedPodErrors, "\n")))
|
||||
evictionErrs := make([]error, 0)
|
||||
for _, pod := range fullEvictionPods {
|
||||
result := evictionResults[pod.Name]
|
||||
if !result.WasEvictionSuccessful() {
|
||||
evictionErrs = append(evictionErrs, result.Err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
if len(evictionErrs) != 0 {
|
||||
return evictionResults, errors.NewAutoscalerError(errors.ApiCallError, "Failed to drain node %s/%s, due to following errors: %v", node.Namespace, node.Name, evictionErrs)
|
||||
}
|
||||
return evictionResults, nil
|
||||
}
|
||||
|
||||
func evictPod(ctx *acontext.AutoscalingContext, podToEvict *apiv1.Pod, isDaemonSetPod bool, retryUntil time.Time, waitBetweenRetries time.Duration, evictionRegister evictionRegister) status.PodEvictionResult {
|
||||
func (e Evictor) evictPod(ctx *acontext.AutoscalingContext, podToEvict *apiv1.Pod, retryUntil time.Time, maxTermination int64, fullEvictionPod bool) status.PodEvictionResult {
|
||||
ctx.Recorder.Eventf(podToEvict, apiv1.EventTypeNormal, "ScaleDown", "deleting pod for node scale down")
|
||||
|
||||
maxTermination := int64(apiv1.DefaultTerminationGracePeriodSeconds)
|
||||
termination := int64(apiv1.DefaultTerminationGracePeriodSeconds)
|
||||
if podToEvict.Spec.TerminationGracePeriodSeconds != nil {
|
||||
if *podToEvict.Spec.TerminationGracePeriodSeconds < int64(ctx.MaxGracefulTerminationSec) {
|
||||
maxTermination = *podToEvict.Spec.TerminationGracePeriodSeconds
|
||||
} else {
|
||||
maxTermination = int64(ctx.MaxGracefulTerminationSec)
|
||||
}
|
||||
termination = *podToEvict.Spec.TerminationGracePeriodSeconds
|
||||
}
|
||||
if maxTermination > 0 && termination > maxTermination {
|
||||
termination = maxTermination
|
||||
}
|
||||
|
||||
var lastError error
|
||||
for first := true; first || time.Now().Before(retryUntil); time.Sleep(waitBetweenRetries) {
|
||||
for first := true; first || time.Now().Before(retryUntil); time.Sleep(e.EvictionRetryTime) {
|
||||
first = false
|
||||
eviction := &policyv1beta1.Eviction{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
@ -236,25 +235,25 @@ func evictPod(ctx *acontext.AutoscalingContext, podToEvict *apiv1.Pod, isDaemonS
|
|||
Name: podToEvict.Name,
|
||||
},
|
||||
DeleteOptions: &metav1.DeleteOptions{
|
||||
GracePeriodSeconds: &maxTermination,
|
||||
GracePeriodSeconds: &termination,
|
||||
},
|
||||
}
|
||||
lastError = ctx.ClientSet.CoreV1().Pods(podToEvict.Namespace).Evict(context.TODO(), eviction)
|
||||
if lastError == nil || kube_errors.IsNotFound(lastError) {
|
||||
if evictionRegister != nil {
|
||||
evictionRegister.RegisterEviction(podToEvict)
|
||||
if e.evictionRegister != nil {
|
||||
e.evictionRegister.RegisterEviction(podToEvict)
|
||||
}
|
||||
return status.PodEvictionResult{Pod: podToEvict, TimedOut: false, Err: nil}
|
||||
}
|
||||
}
|
||||
if !isDaemonSetPod {
|
||||
if fullEvictionPod {
|
||||
klog.Errorf("Failed to evict pod %s, error: %v", podToEvict.Name, lastError)
|
||||
ctx.Recorder.Eventf(podToEvict, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete pod for ScaleDown")
|
||||
}
|
||||
return status.PodEvictionResult{Pod: podToEvict, TimedOut: true, Err: fmt.Errorf("failed to evict pod %s/%s within allowed timeout (last error: %v)", podToEvict.Namespace, podToEvict.Name, lastError)}
|
||||
}
|
||||
|
||||
func podsToEvict(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo) (dsPods, nonDsPods []*apiv1.Pod) {
|
||||
func podsToEvict(nodeInfo *framework.NodeInfo, evictDsByDefault bool) (dsPods, nonDsPods []*apiv1.Pod) {
|
||||
for _, podInfo := range nodeInfo.Pods {
|
||||
if pod_util.IsMirrorPod(podInfo.Pod) {
|
||||
continue
|
||||
|
@ -264,6 +263,12 @@ func podsToEvict(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo)
|
|||
nonDsPods = append(nonDsPods, podInfo.Pod)
|
||||
}
|
||||
}
|
||||
dsPodsToEvict := daemonset.PodsToEvict(dsPods, ctx.DaemonSetEvictionForOccupiedNodes)
|
||||
dsPodsToEvict := daemonset.PodsToEvict(dsPods, evictDsByDefault)
|
||||
return dsPodsToEvict, nonDsPods
|
||||
}
|
||||
|
||||
type podEvictionGroup struct {
|
||||
kubelet_config.ShutdownGracePeriodByPodPriority
|
||||
FullEvictionPods []*apiv1.Pod
|
||||
BestEffortEvictionPods []*apiv1.Pod
|
||||
}
|
||||
|
|
|
@ -31,10 +31,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||
|
||||
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||
acontext "k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
|
@ -44,10 +40,13 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
|
||||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
||||
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||
)
|
||||
|
||||
func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
|
||||
timeNow := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
|
||||
testScenarios := []struct {
|
||||
name string
|
||||
dsPods []string
|
||||
|
@ -58,6 +57,8 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
|
|||
evictByDefault bool
|
||||
extraAnnotationValue map[string]string
|
||||
expectNotEvicted map[string]struct{}
|
||||
fullDsEviction bool
|
||||
podPriorities []int32
|
||||
}{
|
||||
{
|
||||
name: "Successful attempt to evict DaemonSet pods",
|
||||
|
@ -66,23 +67,6 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
|
|||
evictionSuccess: true,
|
||||
evictByDefault: true,
|
||||
},
|
||||
{
|
||||
name: "Failed to create DaemonSet eviction",
|
||||
dsPods: []string{"d1", "d2"},
|
||||
dsEvictionTimeout: 5000 * time.Millisecond,
|
||||
evictionSuccess: false,
|
||||
err: fmt.Errorf("following DaemonSet pod failed to evict on the"),
|
||||
evictByDefault: true,
|
||||
},
|
||||
{
|
||||
name: "Eviction timeout exceed",
|
||||
dsPods: []string{"d1", "d2", "d3"},
|
||||
evictionTimeoutExceed: true,
|
||||
dsEvictionTimeout: 100 * time.Millisecond,
|
||||
evictionSuccess: true,
|
||||
err: fmt.Errorf("failed to create DaemonSet eviction for"),
|
||||
evictByDefault: true,
|
||||
},
|
||||
{
|
||||
name: "Evict single pod due to annotation",
|
||||
dsPods: []string{"d1", "d2"},
|
||||
|
@ -100,6 +84,57 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
|
|||
extraAnnotationValue: map[string]string{"d1": "false"},
|
||||
expectNotEvicted: map[string]struct{}{"d1": {}},
|
||||
},
|
||||
{
|
||||
name: "Failed to create DaemonSet eviction",
|
||||
dsPods: []string{"d1", "d2"},
|
||||
dsEvictionTimeout: 5000 * time.Millisecond,
|
||||
evictionSuccess: false,
|
||||
err: fmt.Errorf("Failed to drain node /n1, due to following errors"),
|
||||
evictByDefault: true,
|
||||
fullDsEviction: true,
|
||||
podPriorities: []int32{0, 1000},
|
||||
},
|
||||
{
|
||||
name: "Eviction timeout exceed",
|
||||
dsPods: []string{"d1", "d2", "d3"},
|
||||
evictionTimeoutExceed: true,
|
||||
dsEvictionTimeout: 100 * time.Millisecond,
|
||||
evictionSuccess: false,
|
||||
err: fmt.Errorf("Failed to drain node /n1, due to following errors"),
|
||||
evictByDefault: true,
|
||||
fullDsEviction: true,
|
||||
podPriorities: []int32{0, 1000, 2000},
|
||||
},
|
||||
{
|
||||
name: "Successful attempt to evict DaemonSet pods",
|
||||
dsPods: []string{"d1", "d2"},
|
||||
dsEvictionTimeout: 5000 * time.Millisecond,
|
||||
evictionSuccess: true,
|
||||
evictByDefault: true,
|
||||
fullDsEviction: true,
|
||||
podPriorities: []int32{0, 1000},
|
||||
},
|
||||
{
|
||||
name: "Evict single pod due to annotation",
|
||||
dsPods: []string{"d1", "d2"},
|
||||
dsEvictionTimeout: 5000 * time.Millisecond,
|
||||
evictionSuccess: true,
|
||||
extraAnnotationValue: map[string]string{"d1": "true"},
|
||||
expectNotEvicted: map[string]struct{}{"d2": {}},
|
||||
fullDsEviction: true,
|
||||
podPriorities: []int32{0, 1000},
|
||||
},
|
||||
{
|
||||
name: "Don't evict single pod due to annotation",
|
||||
dsPods: []string{"d1", "d2"},
|
||||
dsEvictionTimeout: 5000 * time.Millisecond,
|
||||
evictionSuccess: true,
|
||||
evictByDefault: true,
|
||||
extraAnnotationValue: map[string]string{"d1": "false"},
|
||||
expectNotEvicted: map[string]struct{}{"d1": {}},
|
||||
fullDsEviction: true,
|
||||
podPriorities: []int32{0, 1000},
|
||||
},
|
||||
}
|
||||
|
||||
for _, scenario := range testScenarios {
|
||||
|
@ -113,6 +148,7 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
|
|||
},
|
||||
MaxGracefulTerminationSec: 1,
|
||||
DaemonSetEvictionForEmptyNodes: scenario.evictByDefault,
|
||||
MaxPodEvictionTime: scenario.dsEvictionTimeout,
|
||||
}
|
||||
deletedPods := make(chan string, len(scenario.dsPods)+2)
|
||||
waitBetweenRetries := 10 * time.Millisecond
|
||||
|
@ -122,9 +158,11 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
|
|||
SetNodeReadyState(n1, true, time.Time{})
|
||||
dsPods := make([]*apiv1.Pod, len(scenario.dsPods))
|
||||
for i, dsName := range scenario.dsPods {
|
||||
ds := BuildTestPod(dsName, 100, 0)
|
||||
ds := BuildTestPod(dsName, 100, 0, WithDSController())
|
||||
ds.Spec.NodeName = "n1"
|
||||
ds.OwnerReferences = GenerateOwnerReferences("", "DaemonSet", "", "")
|
||||
if scenario.fullDsEviction {
|
||||
ds.Spec.Priority = &scenario.podPriorities[i]
|
||||
}
|
||||
if v, ok := scenario.extraAnnotationValue[dsName]; ok {
|
||||
ds.Annotations[daemonset.EnableDsEvictionKey] = v
|
||||
}
|
||||
|
@ -159,13 +197,24 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
|
|||
|
||||
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{n1}, dsPods)
|
||||
|
||||
drainConfig := SingleRuleDrainConfig(context.MaxGracefulTerminationSec)
|
||||
if scenario.fullDsEviction {
|
||||
drainConfig = []kubelet_config.ShutdownGracePeriodByPodPriority{}
|
||||
for _, priority := range scenario.podPriorities {
|
||||
drainConfig = append(drainConfig, kubelet_config.ShutdownGracePeriodByPodPriority{
|
||||
Priority: priority,
|
||||
ShutdownGracePeriodSeconds: int64(context.MaxGracefulTerminationSec),
|
||||
})
|
||||
}
|
||||
}
|
||||
evictor := Evictor{
|
||||
DsEvictionEmptyNodeTimeout: scenario.dsEvictionTimeout,
|
||||
DsEvictionRetryTime: waitBetweenRetries,
|
||||
EvictionRetryTime: waitBetweenRetries,
|
||||
shutdownGracePeriodByPodPriority: drainConfig,
|
||||
fullDsEviction: scenario.fullDsEviction,
|
||||
}
|
||||
nodeInfo, err := context.ClusterSnapshot.NodeInfos().Get(n1.Name)
|
||||
assert.NoError(t, err)
|
||||
err = evictor.EvictDaemonSetPods(&context, nodeInfo, timeNow)
|
||||
_, err = evictor.EvictDaemonSetPods(&context, nodeInfo)
|
||||
if scenario.err != nil {
|
||||
assert.NotNil(t, err)
|
||||
assert.Contains(t, err.Error(), scenario.err.Error())
|
||||
|
@ -183,7 +232,11 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
|
|||
for i := 0; i < len(expectEvicted); i++ {
|
||||
deleted[i] = utils.GetStringFromChan(deletedPods)
|
||||
}
|
||||
assert.ElementsMatch(t, deleted, expectEvicted)
|
||||
if scenario.fullDsEviction {
|
||||
assert.Equal(t, expectEvicted, deleted)
|
||||
} else {
|
||||
assert.ElementsMatch(t, deleted, expectEvicted)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -192,10 +245,10 @@ func TestDrainNodeWithPods(t *testing.T) {
|
|||
deletedPods := make(chan string, 10)
|
||||
fakeClient := &fake.Clientset{}
|
||||
|
||||
p1 := BuildTestPod("p1", 100, 0)
|
||||
p2 := BuildTestPod("p2", 300, 0)
|
||||
d1 := BuildTestPod("d1", 150, 0)
|
||||
n1 := BuildTestNode("n1", 1000, 1000)
|
||||
p1 := BuildTestPod("p1", 100, 0, WithNodeName(n1.Name))
|
||||
p2 := BuildTestPod("p2", 300, 0, WithNodeName(n1.Name))
|
||||
d1 := BuildTestPod("d1", 150, 0, WithNodeName(n1.Name), WithDSController())
|
||||
|
||||
SetNodeReadyState(n1, true, time.Time{})
|
||||
|
||||
|
@ -216,14 +269,23 @@ func TestDrainNodeWithPods(t *testing.T) {
|
|||
})
|
||||
|
||||
options := config.AutoscalingOptions{
|
||||
MaxGracefulTerminationSec: 20,
|
||||
MaxPodEvictionTime: 5 * time.Second,
|
||||
MaxGracefulTerminationSec: 20,
|
||||
MaxPodEvictionTime: 5 * time.Second,
|
||||
DaemonSetEvictionForOccupiedNodes: true,
|
||||
}
|
||||
ctx, err := NewScaleTestAutoscalingContext(options, fakeClient, nil, nil, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
|
||||
_, err = evictor.DrainNodeWithPods(&ctx, n1, []*apiv1.Pod{p1, p2}, []*apiv1.Pod{d1})
|
||||
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
|
||||
evictor := Evictor{
|
||||
EvictionRetryTime: 0,
|
||||
PodEvictionHeadroom: DefaultPodEvictionHeadroom,
|
||||
shutdownGracePeriodByPodPriority: legacyFlagDrainConfig,
|
||||
}
|
||||
clustersnapshot.InitializeClusterSnapshotOrDie(t, ctx.ClusterSnapshot, []*apiv1.Node{n1}, []*apiv1.Pod{p1, p2, d1})
|
||||
nodeInfo, err := ctx.ClusterSnapshot.NodeInfos().Get(n1.Name)
|
||||
assert.NoError(t, err)
|
||||
_, err = evictor.DrainNode(&ctx, nodeInfo)
|
||||
assert.NoError(t, err)
|
||||
deleted := make([]string, 0)
|
||||
deleted = append(deleted, utils.GetStringFromChan(deletedPods))
|
||||
|
@ -240,11 +302,12 @@ func TestDrainNodeWithPodsWithRescheduled(t *testing.T) {
|
|||
deletedPods := make(chan string, 10)
|
||||
fakeClient := &fake.Clientset{}
|
||||
|
||||
p1 := BuildTestPod("p1", 100, 0)
|
||||
p2 := BuildTestPod("p2", 300, 0)
|
||||
n1 := BuildTestNode("n1", 1000, 1000)
|
||||
p1 := BuildTestPod("p1", 100, 0, WithNodeName(n1.Name))
|
||||
p2 := BuildTestPod("p2", 300, 0, WithNodeName(n1.Name))
|
||||
p2Rescheduled := BuildTestPod("p2", 300, 0)
|
||||
p2Rescheduled.Spec.NodeName = "n2"
|
||||
n1 := BuildTestNode("n1", 1000, 1000)
|
||||
|
||||
SetNodeReadyState(n1, true, time.Time{})
|
||||
|
||||
fakeClient.Fake.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
||||
|
@ -277,8 +340,16 @@ func TestDrainNodeWithPodsWithRescheduled(t *testing.T) {
|
|||
ctx, err := NewScaleTestAutoscalingContext(options, fakeClient, nil, nil, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
|
||||
_, err = evictor.DrainNodeWithPods(&ctx, n1, []*apiv1.Pod{p1, p2}, []*apiv1.Pod{})
|
||||
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
|
||||
evictor := Evictor{
|
||||
EvictionRetryTime: 0,
|
||||
PodEvictionHeadroom: DefaultPodEvictionHeadroom,
|
||||
shutdownGracePeriodByPodPriority: legacyFlagDrainConfig,
|
||||
}
|
||||
clustersnapshot.InitializeClusterSnapshotOrDie(t, ctx.ClusterSnapshot, []*apiv1.Node{n1}, []*apiv1.Pod{p1, p2})
|
||||
nodeInfo, err := ctx.ClusterSnapshot.NodeInfos().Get(n1.Name)
|
||||
assert.NoError(t, err)
|
||||
_, err = evictor.DrainNode(&ctx, nodeInfo)
|
||||
assert.NoError(t, err)
|
||||
deleted := make([]string, 0)
|
||||
deleted = append(deleted, utils.GetStringFromChan(deletedPods))
|
||||
|
@ -297,11 +368,12 @@ func TestDrainNodeWithPodsWithRetries(t *testing.T) {
|
|||
ticket := make(chan bool, 1)
|
||||
fakeClient := &fake.Clientset{}
|
||||
|
||||
p1 := BuildTestPod("p1", 100, 0)
|
||||
p2 := BuildTestPod("p2", 300, 0)
|
||||
p3 := BuildTestPod("p3", 300, 0)
|
||||
d1 := BuildTestPod("d1", 150, 0)
|
||||
n1 := BuildTestNode("n1", 1000, 1000)
|
||||
p1 := BuildTestPod("p1", 100, 0, WithNodeName(n1.Name))
|
||||
p2 := BuildTestPod("p2", 300, 0, WithNodeName(n1.Name))
|
||||
p3 := BuildTestPod("p3", 300, 0, WithNodeName(n1.Name))
|
||||
d1 := BuildTestPod("d1", 150, 0, WithDSController(), WithNodeName(n1.Name))
|
||||
|
||||
SetNodeReadyState(n1, true, time.Time{})
|
||||
|
||||
fakeClient.Fake.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
||||
|
@ -330,14 +402,23 @@ func TestDrainNodeWithPodsWithRetries(t *testing.T) {
|
|||
})
|
||||
|
||||
options := config.AutoscalingOptions{
|
||||
MaxGracefulTerminationSec: 20,
|
||||
MaxPodEvictionTime: 5 * time.Second,
|
||||
MaxGracefulTerminationSec: 20,
|
||||
MaxPodEvictionTime: 5 * time.Second,
|
||||
DaemonSetEvictionForOccupiedNodes: true,
|
||||
}
|
||||
ctx, err := NewScaleTestAutoscalingContext(options, fakeClient, nil, nil, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
|
||||
_, err = evictor.DrainNodeWithPods(&ctx, n1, []*apiv1.Pod{p1, p2, p3}, []*apiv1.Pod{d1})
|
||||
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
|
||||
evictor := Evictor{
|
||||
EvictionRetryTime: 0,
|
||||
PodEvictionHeadroom: DefaultPodEvictionHeadroom,
|
||||
shutdownGracePeriodByPodPriority: legacyFlagDrainConfig,
|
||||
}
|
||||
clustersnapshot.InitializeClusterSnapshotOrDie(t, ctx.ClusterSnapshot, []*apiv1.Node{n1}, []*apiv1.Pod{p1, p2, p3, d1})
|
||||
nodeInfo, err := ctx.ClusterSnapshot.NodeInfos().Get(n1.Name)
|
||||
assert.NoError(t, err)
|
||||
_, err = evictor.DrainNode(&ctx, nodeInfo)
|
||||
assert.NoError(t, err)
|
||||
deleted := make([]string, 0)
|
||||
deleted = append(deleted, utils.GetStringFromChan(deletedPods))
|
||||
|
@ -354,11 +435,12 @@ func TestDrainNodeWithPodsWithRetries(t *testing.T) {
|
|||
func TestDrainNodeWithPodsDaemonSetEvictionFailure(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
|
||||
p1 := BuildTestPod("p1", 100, 0)
|
||||
p2 := BuildTestPod("p2", 300, 0)
|
||||
d1 := BuildTestPod("d1", 150, 0)
|
||||
d2 := BuildTestPod("d2", 250, 0)
|
||||
n1 := BuildTestNode("n1", 1000, 1000)
|
||||
p1 := BuildTestPod("p1", 100, 0, WithNodeName(n1.Name))
|
||||
p2 := BuildTestPod("p2", 300, 0, WithNodeName(n1.Name))
|
||||
d1 := BuildTestPod("d1", 150, 0, WithDSController(), WithNodeName(n1.Name))
|
||||
d2 := BuildTestPod("d2", 250, 0, WithDSController(), WithNodeName(n1.Name))
|
||||
|
||||
e1 := fmt.Errorf("eviction_error: d1")
|
||||
e2 := fmt.Errorf("eviction_error: d2")
|
||||
|
||||
|
@ -390,8 +472,16 @@ func TestDrainNodeWithPodsDaemonSetEvictionFailure(t *testing.T) {
|
|||
ctx, err := NewScaleTestAutoscalingContext(options, fakeClient, nil, nil, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
|
||||
evictionResults, err := evictor.DrainNodeWithPods(&ctx, n1, []*apiv1.Pod{p1, p2}, []*apiv1.Pod{d1, d2})
|
||||
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
|
||||
evictor := Evictor{
|
||||
EvictionRetryTime: 0,
|
||||
PodEvictionHeadroom: DefaultPodEvictionHeadroom,
|
||||
shutdownGracePeriodByPodPriority: legacyFlagDrainConfig,
|
||||
}
|
||||
clustersnapshot.InitializeClusterSnapshotOrDie(t, ctx.ClusterSnapshot, []*apiv1.Node{n1}, []*apiv1.Pod{p1, p2, d1, d2})
|
||||
nodeInfo, err := ctx.ClusterSnapshot.NodeInfos().Get(n1.Name)
|
||||
assert.NoError(t, err)
|
||||
evictionResults, err := evictor.DrainNode(&ctx, nodeInfo)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, len(evictionResults))
|
||||
assert.Equal(t, p1, evictionResults["p1"].Pod)
|
||||
|
@ -407,11 +497,11 @@ func TestDrainNodeWithPodsDaemonSetEvictionFailure(t *testing.T) {
|
|||
func TestDrainNodeWithPodsEvictionFailure(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
|
||||
p1 := BuildTestPod("p1", 100, 0)
|
||||
p2 := BuildTestPod("p2", 100, 0)
|
||||
p3 := BuildTestPod("p3", 100, 0)
|
||||
p4 := BuildTestPod("p4", 100, 0)
|
||||
n1 := BuildTestNode("n1", 1000, 1000)
|
||||
p1 := BuildTestPod("p1", 100, 0, WithNodeName(n1.Name))
|
||||
p2 := BuildTestPod("p2", 100, 0, WithNodeName(n1.Name))
|
||||
p3 := BuildTestPod("p3", 100, 0, WithNodeName(n1.Name))
|
||||
p4 := BuildTestPod("p4", 100, 0, WithNodeName(n1.Name))
|
||||
e2 := fmt.Errorf("eviction_error: p2")
|
||||
e4 := fmt.Errorf("eviction_error: p4")
|
||||
SetNodeReadyState(n1, true, time.Time{})
|
||||
|
@ -442,8 +532,17 @@ func TestDrainNodeWithPodsEvictionFailure(t *testing.T) {
|
|||
ctx, err := NewScaleTestAutoscalingContext(options, fakeClient, nil, nil, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
r := evRegister{}
|
||||
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, evictionRegister: &r}
|
||||
evictionResults, err := evictor.DrainNodeWithPods(&ctx, n1, []*apiv1.Pod{p1, p2, p3, p4}, []*apiv1.Pod{})
|
||||
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
|
||||
evictor := Evictor{
|
||||
EvictionRetryTime: 0,
|
||||
PodEvictionHeadroom: DefaultPodEvictionHeadroom,
|
||||
evictionRegister: &r,
|
||||
shutdownGracePeriodByPodPriority: legacyFlagDrainConfig,
|
||||
}
|
||||
clustersnapshot.InitializeClusterSnapshotOrDie(t, ctx.ClusterSnapshot, []*apiv1.Node{n1}, []*apiv1.Pod{p1, p2, p3, p4})
|
||||
nodeInfo, err := ctx.ClusterSnapshot.NodeInfos().Get(n1.Name)
|
||||
assert.NoError(t, err)
|
||||
evictionResults, err := evictor.DrainNode(&ctx, nodeInfo)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, 4, len(evictionResults))
|
||||
assert.Equal(t, *p1, *evictionResults["p1"].Pod)
|
||||
|
@ -468,12 +567,12 @@ func TestDrainNodeWithPodsEvictionFailure(t *testing.T) {
|
|||
func TestDrainWithPodsNodeDisappearanceFailure(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
|
||||
p1 := BuildTestPod("p1", 100, 0)
|
||||
p2 := BuildTestPod("p2", 100, 0)
|
||||
p3 := BuildTestPod("p3", 100, 0)
|
||||
p4 := BuildTestPod("p4", 100, 0)
|
||||
e2 := fmt.Errorf("disappearance_error: p2")
|
||||
n1 := BuildTestNode("n1", 1000, 1000)
|
||||
p1 := BuildTestPod("p1", 100, 0, WithNodeName(n1.Name))
|
||||
p2 := BuildTestPod("p2", 100, 0, WithNodeName(n1.Name))
|
||||
p3 := BuildTestPod("p3", 100, 0, WithNodeName(n1.Name))
|
||||
p4 := BuildTestPod("p4", 100, 0, WithNodeName(n1.Name))
|
||||
e2 := fmt.Errorf("disappearance_error: p2")
|
||||
SetNodeReadyState(n1, true, time.Time{})
|
||||
|
||||
fakeClient.Fake.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
||||
|
@ -500,8 +599,16 @@ func TestDrainWithPodsNodeDisappearanceFailure(t *testing.T) {
|
|||
ctx, err := NewScaleTestAutoscalingContext(options, fakeClient, nil, nil, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: 0}
|
||||
evictionResults, err := evictor.DrainNodeWithPods(&ctx, n1, []*apiv1.Pod{p1, p2, p3, p4}, []*apiv1.Pod{})
|
||||
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
|
||||
evictor := Evictor{
|
||||
EvictionRetryTime: 0,
|
||||
PodEvictionHeadroom: 0,
|
||||
shutdownGracePeriodByPodPriority: legacyFlagDrainConfig,
|
||||
}
|
||||
clustersnapshot.InitializeClusterSnapshotOrDie(t, ctx.ClusterSnapshot, []*apiv1.Node{n1}, []*apiv1.Pod{p1, p2, p3, p4})
|
||||
nodeInfo, err := ctx.ClusterSnapshot.NodeInfos().Get(n1.Name)
|
||||
assert.NoError(t, err)
|
||||
evictionResults, err := evictor.DrainNode(&ctx, nodeInfo)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, 4, len(evictionResults))
|
||||
assert.Equal(t, *p1, *evictionResults["p1"].Pod)
|
||||
|
@ -593,7 +700,7 @@ func TestPodsToEvict(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("NodeInfos().Get() unexpected error: %v", err)
|
||||
}
|
||||
gotDsPods, gotNonDsPods := podsToEvict(ctx, nodeInfo)
|
||||
gotDsPods, gotNonDsPods := podsToEvict(nodeInfo, ctx.DaemonSetEvictionForOccupiedNodes)
|
||||
if diff := cmp.Diff(tc.wantDsPods, gotDsPods, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("podsToEvict dsPods diff (-want +got):\n%s", diff)
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ package actuation
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
@ -104,7 +103,7 @@ func (ds *GroupDeletionScheduler) prepareNodeForDeletion(nodeInfo *framework.Nod
|
|||
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToEvictPods, Err: err, PodEvictionResults: evictionResults}
|
||||
}
|
||||
} else {
|
||||
if err := ds.evictor.EvictDaemonSetPods(ds.ctx, nodeInfo, time.Now()); err != nil {
|
||||
if _, err := ds.evictor.EvictDaemonSetPods(ds.ctx, nodeInfo); err != nil {
|
||||
// Evicting DS pods is best-effort, so proceed with the deletion even if there are errors.
|
||||
klog.Warningf("Error while evicting DS pods from an empty node %q: %v", node.Name, err)
|
||||
}
|
||||
|
|
|
@ -126,7 +126,7 @@ func TestScheduleDeletion(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Couldn't set up autoscaling context: %v", err)
|
||||
}
|
||||
scheduler := NewGroupDeletionScheduler(&ctx, tracker, batcher, Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom})
|
||||
scheduler := NewGroupDeletionScheduler(&ctx, tracker, batcher, Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom})
|
||||
|
||||
if err := scheduleAll(tc.toSchedule, scheduler); err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package actuation
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
)
|
||||
|
||||
func groupByPriority(shutdownGracePeriodByPodPriority []kubelet_config.ShutdownGracePeriodByPodPriority, fullEvictionPods, bestEffortEvictionPods []*apiv1.Pod) []podEvictionGroup {
|
||||
groups := make([]podEvictionGroup, 0, len(shutdownGracePeriodByPodPriority))
|
||||
for _, period := range shutdownGracePeriodByPodPriority {
|
||||
groups = append(groups, podEvictionGroup{
|
||||
ShutdownGracePeriodByPodPriority: period,
|
||||
})
|
||||
}
|
||||
|
||||
for _, pod := range fullEvictionPods {
|
||||
index := groupIndex(pod, groups)
|
||||
groups[index].FullEvictionPods = append(groups[index].FullEvictionPods, pod)
|
||||
}
|
||||
for _, pod := range bestEffortEvictionPods {
|
||||
index := groupIndex(pod, groups)
|
||||
groups[index].BestEffortEvictionPods = append(groups[index].BestEffortEvictionPods, pod)
|
||||
}
|
||||
return groups
|
||||
}
|
||||
|
||||
func groupIndex(pod *apiv1.Pod, groups []podEvictionGroup) int {
|
||||
var priority int32
|
||||
if pod.Spec.Priority != nil {
|
||||
priority = *pod.Spec.Priority
|
||||
}
|
||||
|
||||
// Find the group index according to the priority.
|
||||
index := sort.Search(len(groups), func(i int) bool {
|
||||
return (groups)[i].Priority >= priority
|
||||
})
|
||||
|
||||
// 1. Those higher than the highest priority default to the highest priority
|
||||
// 2. Those lower than the lowest priority default to the lowest priority
|
||||
// 3. Those boundary priority default to the lower priority
|
||||
// if priority of pod is:
|
||||
// groups[index-1].Priority <= pod priority < groups[index].Priority
|
||||
// in which case we want to pick lower one (i.e. index-1)
|
||||
if index == len(groups) {
|
||||
index = len(groups) - 1
|
||||
} else if index < 0 {
|
||||
index = 0
|
||||
} else if index > 0 && (groups)[index].Priority > priority {
|
||||
index--
|
||||
}
|
||||
return index
|
||||
}
|
||||
|
||||
// ParseShutdownGracePeriodsAndPriorities parse priorityGracePeriodStr and returns an array of ShutdownGracePeriodByPodPriority if succeeded.
|
||||
// Otherwise, returns an empty list
|
||||
func ParseShutdownGracePeriodsAndPriorities(priorityGracePeriodStr string) []kubelet_config.ShutdownGracePeriodByPodPriority {
|
||||
var priorityGracePeriodMap, emptyMap []kubelet_config.ShutdownGracePeriodByPodPriority
|
||||
|
||||
if priorityGracePeriodStr == "" {
|
||||
return emptyMap
|
||||
}
|
||||
priorityGracePeriodStrArr := strings.Split(priorityGracePeriodStr, ",")
|
||||
for _, item := range priorityGracePeriodStrArr {
|
||||
priorityAndPeriod := strings.Split(item, ":")
|
||||
if len(priorityAndPeriod) != 2 {
|
||||
klog.Errorf("Parsing shutdown grace periods failed because '%s' is not a priority and grace period couple separated by ':'", item)
|
||||
return emptyMap
|
||||
}
|
||||
priority, err := strconv.Atoi(priorityAndPeriod[0])
|
||||
if err != nil {
|
||||
klog.Errorf("Parsing shutdown grace periods and priorities failed: %v", err)
|
||||
return emptyMap
|
||||
}
|
||||
shutDownGracePeriod, err := strconv.Atoi(priorityAndPeriod[1])
|
||||
if err != nil {
|
||||
klog.Errorf("Parsing shutdown grace periods and priorities failed: %v", err)
|
||||
return emptyMap
|
||||
}
|
||||
priorityGracePeriodMap = append(priorityGracePeriodMap, kubelet_config.ShutdownGracePeriodByPodPriority{
|
||||
Priority: int32(priority),
|
||||
ShutdownGracePeriodSeconds: int64(shutDownGracePeriod),
|
||||
})
|
||||
}
|
||||
return priorityGracePeriodMap
|
||||
}
|
||||
|
||||
// SingleRuleDrainConfig returns an array of ShutdownGracePeriodByPodPriority with a single ShutdownGracePeriodByPodPriority
|
||||
func SingleRuleDrainConfig(shutdownGracePeriodSeconds int) []kubelet_config.ShutdownGracePeriodByPodPriority {
|
||||
return []kubelet_config.ShutdownGracePeriodByPodPriority{
|
||||
{
|
||||
Priority: math.MaxInt32,
|
||||
ShutdownGracePeriodSeconds: int64(shutdownGracePeriodSeconds),
|
||||
},
|
||||
}
|
||||
}
|
|
@ -0,0 +1,239 @@
|
|||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package actuation
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
policyv1beta1 "k8s.io/api/policy/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
|
||||
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
)
|
||||
|
||||
func TestPriorityEvictor(t *testing.T) {
|
||||
deletedPods := make(chan string, 10)
|
||||
fakeClient := &fake.Clientset{}
|
||||
|
||||
n1 := BuildTestNode("n1", 1000, 1000)
|
||||
p1 := BuildTestPod("p1", 100, 0, WithNodeName(n1.Name))
|
||||
p2 := BuildTestPod("p2", 300, 0, WithNodeName(n1.Name))
|
||||
p3 := BuildTestPod("p3", 150, 0, WithNodeName(n1.Name))
|
||||
|
||||
priority100 := int32(100)
|
||||
priority2000 := int32(2000)
|
||||
priority2000000005 := int32(2000000005)
|
||||
p1.Spec.Priority = &priority2000000005
|
||||
p2.Spec.Priority = &priority2000
|
||||
p3.Spec.Priority = &priority100
|
||||
|
||||
SetNodeReadyState(n1, true, time.Time{})
|
||||
|
||||
fakeClient.Fake.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
||||
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
|
||||
})
|
||||
fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
||||
createAction := action.(core.CreateAction)
|
||||
if createAction == nil {
|
||||
return false, nil, nil
|
||||
}
|
||||
eviction := createAction.GetObject().(*policyv1beta1.Eviction)
|
||||
if eviction == nil {
|
||||
return false, nil, nil
|
||||
}
|
||||
deletedPods <- eviction.Name
|
||||
return true, nil, nil
|
||||
})
|
||||
|
||||
options := config.AutoscalingOptions{
|
||||
MaxGracefulTerminationSec: 20,
|
||||
MaxPodEvictionTime: 5 * time.Second,
|
||||
}
|
||||
ctx, err := NewScaleTestAutoscalingContext(options, fakeClient, nil, nil, nil, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
evictor := Evictor{
|
||||
EvictionRetryTime: 0,
|
||||
PodEvictionHeadroom: DefaultPodEvictionHeadroom,
|
||||
shutdownGracePeriodByPodPriority: []kubelet_config.ShutdownGracePeriodByPodPriority{
|
||||
{
|
||||
Priority: 0,
|
||||
ShutdownGracePeriodSeconds: 3,
|
||||
},
|
||||
{
|
||||
Priority: 1000,
|
||||
ShutdownGracePeriodSeconds: 2,
|
||||
},
|
||||
{
|
||||
Priority: 2000000000,
|
||||
ShutdownGracePeriodSeconds: 1,
|
||||
},
|
||||
},
|
||||
fullDsEviction: true,
|
||||
}
|
||||
clustersnapshot.InitializeClusterSnapshotOrDie(t, ctx.ClusterSnapshot, []*apiv1.Node{n1}, []*apiv1.Pod{p1, p2, p3})
|
||||
nodeInfo, err := ctx.ClusterSnapshot.NodeInfos().Get(n1.Name)
|
||||
assert.NoError(t, err)
|
||||
_, err = evictor.DrainNode(&ctx, nodeInfo)
|
||||
assert.NoError(t, err)
|
||||
deleted := make([]string, 0)
|
||||
deleted = append(deleted, utils.GetStringFromChan(deletedPods))
|
||||
deleted = append(deleted, utils.GetStringFromChan(deletedPods))
|
||||
deleted = append(deleted, utils.GetStringFromChan(deletedPods))
|
||||
|
||||
assert.Equal(t, p3.Name, deleted[0])
|
||||
assert.Equal(t, p2.Name, deleted[1])
|
||||
assert.Equal(t, p1.Name, deleted[2])
|
||||
}
|
||||
|
||||
func TestGroupByPriority(t *testing.T) {
|
||||
p1 := BuildTestPod("p1", 100, 0)
|
||||
p2 := BuildTestPod("p2", 300, 0)
|
||||
p3 := BuildTestPod("p3", 150, 0)
|
||||
p4 := BuildTestPod("p4", 100, 0)
|
||||
p5 := BuildTestPod("p5", 300, 0)
|
||||
|
||||
p6 := BuildTestPod("p6", 100, 0)
|
||||
p7 := BuildTestPod("p7", 300, 0)
|
||||
p8 := BuildTestPod("p8", 150, 0)
|
||||
p9 := BuildTestPod("p9", 100, 0)
|
||||
p10 := BuildTestPod("p10", 300, 0)
|
||||
|
||||
priority0 := int32(0)
|
||||
priority100 := int32(100)
|
||||
priority500 := int32(500)
|
||||
priority1000 := int32(1000)
|
||||
priority2000000005 := int32(2000000005)
|
||||
p1.Spec.Priority = &priority2000000005
|
||||
p2.Spec.Priority = &priority500
|
||||
p3.Spec.Priority = &priority100
|
||||
p4.Spec.Priority = &priority0
|
||||
p5.Spec.Priority = &priority1000
|
||||
|
||||
p6.Spec.Priority = &priority2000000005
|
||||
p7.Spec.Priority = &priority500
|
||||
p8.Spec.Priority = &priority100
|
||||
p9.Spec.Priority = &priority0
|
||||
p10.Spec.Priority = &priority1000
|
||||
|
||||
shutdownGracePeriodByPodPriority := []kubelet_config.ShutdownGracePeriodByPodPriority{
|
||||
{
|
||||
Priority: 10,
|
||||
ShutdownGracePeriodSeconds: 4,
|
||||
},
|
||||
{
|
||||
Priority: 1000,
|
||||
ShutdownGracePeriodSeconds: 3,
|
||||
},
|
||||
{
|
||||
Priority: 2000,
|
||||
ShutdownGracePeriodSeconds: 2,
|
||||
},
|
||||
{
|
||||
Priority: 2000000000,
|
||||
ShutdownGracePeriodSeconds: 1,
|
||||
},
|
||||
}
|
||||
|
||||
wantGroups := []podEvictionGroup{
|
||||
{
|
||||
ShutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority[0],
|
||||
FullEvictionPods: []*apiv1.Pod{p2, p3, p4},
|
||||
BestEffortEvictionPods: []*apiv1.Pod{p7, p8, p9},
|
||||
},
|
||||
{
|
||||
ShutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority[1],
|
||||
FullEvictionPods: []*apiv1.Pod{p5},
|
||||
BestEffortEvictionPods: []*apiv1.Pod{p10},
|
||||
},
|
||||
{
|
||||
ShutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority[2],
|
||||
},
|
||||
{
|
||||
ShutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority[3],
|
||||
FullEvictionPods: []*apiv1.Pod{p1},
|
||||
BestEffortEvictionPods: []*apiv1.Pod{p6},
|
||||
},
|
||||
}
|
||||
|
||||
groups := groupByPriority(shutdownGracePeriodByPodPriority, []*apiv1.Pod{p1, p2, p3, p4, p5}, []*apiv1.Pod{p6, p7, p8, p9, p10})
|
||||
assert.Equal(t, wantGroups, groups)
|
||||
}
|
||||
|
||||
func TestParseShutdownGracePeriodsAndPriorities(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
input string
|
||||
want []kubelet_config.ShutdownGracePeriodByPodPriority
|
||||
}{
|
||||
{
|
||||
name: "empty input",
|
||||
input: "",
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "Incorrect string - incorrect priority grace period pairs",
|
||||
input: "1:2,34",
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "Incorrect string - trailing ,",
|
||||
input: "1:2, 3:4,",
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "Incorrect string - trailing space",
|
||||
input: "1:2,3:4 ",
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "Non integers - 1",
|
||||
input: "1:2,3:a",
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "Non integers - 2",
|
||||
input: "1:2,3:23.2",
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "parsable input",
|
||||
input: "1:2,3:4",
|
||||
want: []kubelet_config.ShutdownGracePeriodByPodPriority{
|
||||
{1, 2},
|
||||
{3, 4},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
shutdownGracePeriodByPodPriority := ParseShutdownGracePeriodsAndPriorities(tc.input)
|
||||
assert.Equal(t, tc.want, shutdownGracePeriodByPodPriority)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -66,7 +66,7 @@ func getTestCases(ignoreDaemonSetsUtilization bool, suffix string, now time.Time
|
|||
smallPod := BuildTestPod("smallPod", 100, 0)
|
||||
smallPod.Spec.NodeName = "regular"
|
||||
|
||||
dsPod := BuildDSTestPod("dsPod", 500, 0)
|
||||
dsPod := BuildTestPod("dsPod", 500, 0, WithDSController())
|
||||
dsPod.Spec.NodeName = "regular"
|
||||
|
||||
testCases := []testCase{
|
||||
|
|
|
@ -28,6 +28,11 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
|
||||
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
|
@ -40,7 +45,6 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/expander"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/metrics"
|
||||
|
@ -53,7 +57,6 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
|
||||
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
|
||||
|
@ -151,14 +154,15 @@ var (
|
|||
maxBulkSoftTaintCount = flag.Int("max-bulk-soft-taint-count", 10, "Maximum number of nodes that can be tainted/untainted PreferNoSchedule at the same time. Set to 0 to turn off such tainting.")
|
||||
maxBulkSoftTaintTime = flag.Duration("max-bulk-soft-taint-time", 3*time.Second, "Maximum duration of tainting/untainting nodes as PreferNoSchedule at the same time.")
|
||||
maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.")
|
||||
maxGracefulTerminationFlag = flag.Int("max-graceful-termination-sec", 10*60, "Maximum number of seconds CA waits for pod termination when trying to scale down a node.")
|
||||
maxTotalUnreadyPercentage = flag.Float64("max-total-unready-percentage", 45, "Maximum percentage of unready nodes in the cluster. After this is exceeded, CA halts operations")
|
||||
okTotalUnreadyCount = flag.Int("ok-total-unready-count", 3, "Number of allowed unready nodes, irrespective of max-total-unready-percentage")
|
||||
scaleUpFromZero = flag.Bool("scale-up-from-zero", true, "Should CA scale up when there are 0 ready nodes.")
|
||||
parallelScaleUp = flag.Bool("parallel-scale-up", false, "Whether to allow parallel node groups scale up. Experimental: may not work on some cloud providers, enable at your own risk.")
|
||||
maxNodeProvisionTime = flag.Duration("max-node-provision-time", 15*time.Minute, "The default maximum time CA waits for node to be provisioned - the value can be overridden per node group")
|
||||
maxPodEvictionTime = flag.Duration("max-pod-eviction-time", 2*time.Minute, "Maximum time CA tries to evict a pod before giving up")
|
||||
nodeGroupsFlag = multiStringFlag(
|
||||
maxGracefulTerminationFlag = flag.Int("max-graceful-termination-sec", 10*60, "Maximum number of seconds CA waits for pod termination when trying to scale down a node. "+
|
||||
"This flag is mutually exclusion with drain-priority-config flag which allows more configuration options.")
|
||||
maxTotalUnreadyPercentage = flag.Float64("max-total-unready-percentage", 45, "Maximum percentage of unready nodes in the cluster. After this is exceeded, CA halts operations")
|
||||
okTotalUnreadyCount = flag.Int("ok-total-unready-count", 3, "Number of allowed unready nodes, irrespective of max-total-unready-percentage")
|
||||
scaleUpFromZero = flag.Bool("scale-up-from-zero", true, "Should CA scale up when there are 0 ready nodes.")
|
||||
parallelScaleUp = flag.Bool("parallel-scale-up", false, "Whether to allow parallel node groups scale up. Experimental: may not work on some cloud providers, enable at your own risk.")
|
||||
maxNodeProvisionTime = flag.Duration("max-node-provision-time", 15*time.Minute, "The default maximum time CA waits for node to be provisioned - the value can be overridden per node group")
|
||||
maxPodEvictionTime = flag.Duration("max-pod-eviction-time", 2*time.Minute, "Maximum time CA tries to evict a pod before giving up")
|
||||
nodeGroupsFlag = multiStringFlag(
|
||||
"nodes",
|
||||
"sets min,max size and other configuration data for a node group in a format accepted by cloud provider. Can be used multiple times. Format: <min>:<max>:<other...>")
|
||||
nodeGroupAutoDiscoveryFlag = multiStringFlag(
|
||||
|
@ -241,6 +245,11 @@ var (
|
|||
forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.")
|
||||
dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server")
|
||||
bypassedSchedulers = pflag.StringSlice("bypassed-scheduler-names", []string{}, fmt.Sprintf("Names of schedulers to bypass. If set to non-empty value, CA will not wait for pods to reach a certain age before triggering a scale-up."))
|
||||
drainPriorityConfig = flag.String("drain-priority-config", "",
|
||||
"List of ',' separated pairs (priority:terminationGracePeriodSeconds) of integers separated by ':' enables priority evictor. Priority evictor groups pods into priority groups based on pod priority and evict pods in the ascending order of group priorities"+
|
||||
"--max-graceful-termination-sec flag should not be set when this flag is set. Not setting this flag will use unordered evictor by default."+
|
||||
"Priority evictor reuses the concepts of drain logic in kubelet(https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2712-pod-priority-based-graceful-node-shutdown#migration-from-the-node-graceful-shutdown-feature)."+
|
||||
"Eg. flag usage: '10000:20,1000:100,0:60'")
|
||||
)
|
||||
|
||||
func isFlagPassed(name string) bool {
|
||||
|
@ -301,6 +310,18 @@ func createAutoscalingOptions() config.AutoscalingOptions {
|
|||
klog.Fatalf("Failed to get scheduler config: %v", err)
|
||||
}
|
||||
|
||||
if isFlagPassed("drain-priority-config") && isFlagPassed("max-graceful-termination-sec") {
|
||||
klog.Fatalf("Invalid configuration, could not use --drain-priority-config together with --max-graceful-termination-sec")
|
||||
}
|
||||
|
||||
var drainPriorityConfigMap []kubelet_config.ShutdownGracePeriodByPodPriority
|
||||
if isFlagPassed("drain-priority-config") {
|
||||
drainPriorityConfigMap = actuation.ParseShutdownGracePeriodsAndPriorities(*drainPriorityConfig)
|
||||
if len(drainPriorityConfigMap) == 0 {
|
||||
klog.Fatalf("Invalid configuration, parsing --drain-priority-config")
|
||||
}
|
||||
}
|
||||
|
||||
return config.AutoscalingOptions{
|
||||
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
|
||||
ScaleDownUtilizationThreshold: *scaleDownUtilizationThreshold,
|
||||
|
@ -343,6 +364,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
|
|||
ScaleDownNonEmptyCandidatesCount: *scaleDownNonEmptyCandidatesCount,
|
||||
ScaleDownCandidatesPoolRatio: *scaleDownCandidatesPoolRatio,
|
||||
ScaleDownCandidatesPoolMinCount: *scaleDownCandidatesPoolMinCount,
|
||||
DrainPriorityConfig: drainPriorityConfigMap,
|
||||
SchedulerConfig: parsedSchedConfig,
|
||||
WriteStatusConfigMap: *writeStatusConfigMapFlag,
|
||||
StatusConfigMapName: *statusConfigMapName,
|
||||
|
|
|
@ -44,6 +44,9 @@ type FunctionLabel string
|
|||
// NodeGroupType describes node group relation to CA
|
||||
type NodeGroupType string
|
||||
|
||||
// PodEvictionResult describes result of the pod eviction attempt
|
||||
type PodEvictionResult string
|
||||
|
||||
const (
|
||||
caNamespace = "cluster_autoscaler"
|
||||
readyLabel = "ready"
|
||||
|
@ -87,6 +90,10 @@ const (
|
|||
// This is meant to help find unexpectedly long function execution times for
|
||||
// debugging purposes.
|
||||
LogLongDurationThreshold = 5 * time.Second
|
||||
// PodEvictionSucceed means creation of the pod eviction object succeed
|
||||
PodEvictionSucceed PodEvictionResult = "succeed"
|
||||
// PodEvictionFailed means creation of the pod eviction object failed
|
||||
PodEvictionFailed PodEvictionResult = "failed"
|
||||
)
|
||||
|
||||
// Names of Cluster Autoscaler operations
|
||||
|
@ -300,12 +307,12 @@ var (
|
|||
}, []string{"reason", "gpu_resource_name", "gpu_name"},
|
||||
)
|
||||
|
||||
evictionsCount = k8smetrics.NewCounter(
|
||||
evictionsCount = k8smetrics.NewCounterVec(
|
||||
&k8smetrics.CounterOpts{
|
||||
Namespace: caNamespace,
|
||||
Name: "evicted_pods_total",
|
||||
Help: "Number of pods evicted by CA",
|
||||
},
|
||||
}, []string{"eviction_result"},
|
||||
)
|
||||
|
||||
unneededNodesCount = k8smetrics.NewGauge(
|
||||
|
@ -566,9 +573,9 @@ func RegisterScaleDown(nodesCount int, gpuResourceName, gpuType string, reason N
|
|||
}
|
||||
}
|
||||
|
||||
// RegisterEvictions records number of evicted pods
|
||||
func RegisterEvictions(podsCount int) {
|
||||
evictionsCount.Add(float64(podsCount))
|
||||
// RegisterEvictions records number of evicted pods succeed or failed
|
||||
func RegisterEvictions(podsCount int, result PodEvictionResult) {
|
||||
evictionsCount.WithLabelValues(string(result)).Add(float64(podsCount))
|
||||
}
|
||||
|
||||
// UpdateUnneededNodesCount records number of currently unneeded nodes
|
||||
|
|
|
@ -89,13 +89,18 @@ func AddSchedulerName(schedulerName string) func(*apiv1.Pod) {
|
|||
}
|
||||
}
|
||||
|
||||
// BuildDSTestPod creates a DaemonSet pod with cpu and memory.
|
||||
func BuildDSTestPod(name string, cpu int64, mem int64) *apiv1.Pod {
|
||||
// WithDSController creates a daemonSet owner ref for the pod.
|
||||
func WithDSController() func(*apiv1.Pod) {
|
||||
return func(pod *apiv1.Pod) {
|
||||
pod.OwnerReferences = GenerateOwnerReferences("ds", "DaemonSet", "apps/v1", "some-uid")
|
||||
}
|
||||
}
|
||||
|
||||
pod := BuildTestPod(name, cpu, mem)
|
||||
pod.OwnerReferences = GenerateOwnerReferences("ds", "DaemonSet", "apps/v1", "some-uid")
|
||||
|
||||
return pod
|
||||
// WithNodeName sets a node name to the pod.
|
||||
func WithNodeName(nodeName string) func(*apiv1.Pod) {
|
||||
return func(pod *apiv1.Pod) {
|
||||
pod.Spec.NodeName = nodeName
|
||||
}
|
||||
}
|
||||
|
||||
// BuildTestPodWithEphemeralStorage creates a pod with cpu, memory and ephemeral storage resources.
|
||||
|
|
Loading…
Reference in New Issue