Extract drainability rules into packages

This commit is contained in:
Artem Minyaylov 2023-09-27 06:08:26 +00:00
parent e461782e27
commit af638733e1
14 changed files with 165 additions and 91 deletions

View File

@ -21,7 +21,6 @@ import (
"time" "time"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
@ -30,6 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/metrics"
@ -223,7 +223,7 @@ func (a *Actuator) deleteAsyncDrain(NodeGroupViews []*budgets.NodeGroupView) (re
} }
func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool, batchSize int) { func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool, batchSize int) {
var pdbs []*policyv1.PodDisruptionBudget var remainingPdbTracker pdb.RemainingPdbTracker
var registry kube_util.ListerRegistry var registry kube_util.ListerRegistry
if len(nodes) == 0 { if len(nodes) == 0 {
@ -246,7 +246,7 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
} }
if drain { if drain {
pdbs, err = a.ctx.PodDisruptionBudgetLister().List() pdbs, err := a.ctx.PodDisruptionBudgetLister().List()
if err != nil { if err != nil {
klog.Errorf("Scale-down: couldn't fetch pod disruption budgets, err: %v", err) klog.Errorf("Scale-down: couldn't fetch pod disruption budgets, err: %v", err)
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "podDisruptionBudgetLister.List returned error %v", err)} nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "podDisruptionBudgetLister.List returned error %v", err)}
@ -255,7 +255,8 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
} }
return return
} }
remainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
remainingPdbTracker.SetPdbs(pdbs)
registry = a.ctx.ListerRegistry registry = a.ctx.ListerRegistry
} }
@ -272,7 +273,7 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
continue continue
} }
podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, registry, pdbs, time.Now()) podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, registry, remainingPdbTracker, time.Now())
if err != nil { if err != nil {
klog.Errorf("Scale-down: couldn't delete node %q, err: %v", node.Name, err) klog.Errorf("Scale-down: couldn't delete node %q, err: %v", node.Name, err)
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "GetPodsToMove for %q returned error: %v", node.Name, err)} nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "GetPodsToMove for %q returned error: %v", node.Name, err)}

View File

@ -23,7 +23,6 @@ import (
"time" "time"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1" policyv1beta1 "k8s.io/api/policy/v1beta1"
kube_errors "k8s.io/apimachinery/pkg/api/errors" kube_errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -177,7 +176,7 @@ func (e Evictor) DrainNodeWithPods(ctx *acontext.AutoscalingContext, node *apiv1
// EvictDaemonSetPods creates eviction objects for all DaemonSet pods on the node. // EvictDaemonSetPods creates eviction objects for all DaemonSet pods on the node.
func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo, timeNow time.Time) error { func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo, timeNow time.Time) error {
nodeToDelete := nodeInfo.Node() nodeToDelete := nodeInfo.Node()
_, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, nil, []*policyv1.PodDisruptionBudget{}, timeNow) _, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, nil, nil, timeNow)
if err != nil { if err != nil {
return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err) return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err)
} }

View File

@ -147,7 +147,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
currentCandidates, currentCandidates,
destinations, destinations,
timestamp, timestamp,
sd.context.RemainingPdbTracker.GetPdbs()) sd.context.RemainingPdbTracker)
additionalCandidatesCount := sd.context.ScaleDownNonEmptyCandidatesCount - len(nodesToRemove) additionalCandidatesCount := sd.context.ScaleDownNonEmptyCandidatesCount - len(nodesToRemove)
if additionalCandidatesCount > len(currentNonCandidates) { if additionalCandidatesCount > len(currentNonCandidates) {
@ -169,7 +169,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
currentNonCandidates[:additionalCandidatesPoolSize], currentNonCandidates[:additionalCandidatesPoolSize],
destinations, destinations,
timestamp, timestamp,
sd.context.RemainingPdbTracker.GetPdbs()) sd.context.RemainingPdbTracker)
if len(additionalNodesToRemove) > additionalCandidatesCount { if len(additionalNodesToRemove) > additionalCandidatesCount {
additionalNodesToRemove = additionalNodesToRemove[:additionalCandidatesCount] additionalNodesToRemove = additionalNodesToRemove[:additionalCandidatesCount]
} }
@ -317,7 +317,7 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time) (_, drain []*apiv1.Nod
candidateNames, candidateNames,
allNodeNames, allNodeNames,
time.Now(), time.Now(),
sd.context.RemainingPdbTracker.GetPdbs()) sd.context.RemainingPdbTracker)
findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart) findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart)
for _, unremovableNode := range unremovable { for _, unremovableNode := range unremovable {

View File

@ -22,11 +22,11 @@ import (
"time" "time"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
@ -47,7 +47,7 @@ type eligibilityChecker interface {
type removalSimulator interface { type removalSimulator interface {
DropOldHints() DropOldHints()
SimulateNodeRemoval(node string, podDestinations map[string]bool, timestamp time.Time, pdbs []*policyv1.PodDisruptionBudget) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode) SimulateNodeRemoval(node string, podDestinations map[string]bool, timestamp time.Time, remainingPdbTracker pdb.RemainingPdbTracker) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode)
} }
// controllerReplicasCalculator calculates a number of target and expected replicas for a given controller. // controllerReplicasCalculator calculates a number of target and expected replicas for a given controller.
@ -276,7 +276,7 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
klog.V(4).Infof("%d out of %d nodes skipped in scale down simulation: there are already %d unneeded nodes so no point in looking for more.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames), len(removableList)) klog.V(4).Infof("%d out of %d nodes skipped in scale down simulation: there are already %d unneeded nodes so no point in looking for more.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames), len(removableList))
break break
} }
removable, unremovable := p.rs.SimulateNodeRemoval(node, podDestinations, p.latestUpdate, p.context.RemainingPdbTracker.GetPdbs()) removable, unremovable := p.rs.SimulateNodeRemoval(node, podDestinations, p.latestUpdate, p.context.RemainingPdbTracker)
if removable != nil { if removable != nil {
_, inParallel, _ := p.context.RemainingPdbTracker.CanRemovePods(removable.PodsToReschedule) _, inParallel, _ := p.context.RemainingPdbTracker.CanRemovePods(removable.PodsToReschedule)
if !inParallel { if !inParallel {

View File

@ -24,7 +24,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
@ -32,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
. "k8s.io/autoscaler/cluster-autoscaler/core/test" . "k8s.io/autoscaler/cluster-autoscaler/core/test"
@ -901,7 +901,7 @@ type fakeRemovalSimulator struct {
func (r *fakeRemovalSimulator) DropOldHints() {} func (r *fakeRemovalSimulator) DropOldHints() {}
func (r *fakeRemovalSimulator) SimulateNodeRemoval(name string, _ map[string]bool, _ time.Time, _ []*policyv1.PodDisruptionBudget) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode) { func (r *fakeRemovalSimulator) SimulateNodeRemoval(name string, _ map[string]bool, _ time.Time, _ pdb.RemainingPdbTracker) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode) {
time.Sleep(r.sleep) time.Sleep(r.sleep)
node := &apiv1.Node{} node := &apiv1.Node{}
for _, n := range r.nodes { for _, n := range r.nodes {

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"time" "time"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
@ -29,7 +30,6 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
klog "k8s.io/klog/v2" klog "k8s.io/klog/v2"
) )
@ -117,7 +117,7 @@ func (r *RemovalSimulator) FindNodesToRemove(
candidates []string, candidates []string,
destinations []string, destinations []string,
timestamp time.Time, timestamp time.Time,
pdbs []*policyv1.PodDisruptionBudget, remainingPdbTracker pdb.RemainingPdbTracker,
) (nodesToRemove []NodeToBeRemoved, unremovableNodes []*UnremovableNode) { ) (nodesToRemove []NodeToBeRemoved, unremovableNodes []*UnremovableNode) {
result := make([]NodeToBeRemoved, 0) result := make([]NodeToBeRemoved, 0)
unremovable := make([]*UnremovableNode, 0) unremovable := make([]*UnremovableNode, 0)
@ -128,7 +128,7 @@ func (r *RemovalSimulator) FindNodesToRemove(
} }
for _, nodeName := range candidates { for _, nodeName := range candidates {
rn, urn := r.SimulateNodeRemoval(nodeName, destinationMap, timestamp, pdbs) rn, urn := r.SimulateNodeRemoval(nodeName, destinationMap, timestamp, remainingPdbTracker)
if rn != nil { if rn != nil {
result = append(result, *rn) result = append(result, *rn)
} else if urn != nil { } else if urn != nil {
@ -146,7 +146,7 @@ func (r *RemovalSimulator) SimulateNodeRemoval(
nodeName string, nodeName string,
destinationMap map[string]bool, destinationMap map[string]bool,
timestamp time.Time, timestamp time.Time,
pdbs []*policyv1.PodDisruptionBudget, remainingPdbTracker pdb.RemainingPdbTracker,
) (*NodeToBeRemoved, *UnremovableNode) { ) (*NodeToBeRemoved, *UnremovableNode) {
nodeInfo, err := r.clusterSnapshot.NodeInfos().Get(nodeName) nodeInfo, err := r.clusterSnapshot.NodeInfos().Get(nodeName)
if err != nil { if err != nil {
@ -159,7 +159,7 @@ func (r *RemovalSimulator) SimulateNodeRemoval(
return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError} return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError}
} }
podsToRemove, daemonSetPods, blockingPod, err := GetPodsToMove(nodeInfo, r.deleteOptions, r.listers, pdbs, timestamp) podsToRemove, daemonSetPods, blockingPod, err := GetPodsToMove(nodeInfo, r.deleteOptions, r.listers, remainingPdbTracker, timestamp)
if err != nil { if err != nil {
klog.V(2).Infof("node %s cannot be removed: %v", nodeName, err) klog.V(2).Infof("node %s cannot be removed: %v", nodeName, err)
if blockingPod != nil { if blockingPod != nil {

View File

@ -30,7 +30,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/types"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
@ -207,7 +206,7 @@ func TestFindNodesToRemove(t *testing.T) {
} }
clustersnapshot.InitializeClusterSnapshotOrDie(t, clusterSnapshot, test.allNodes, test.pods) clustersnapshot.InitializeClusterSnapshotOrDie(t, clusterSnapshot, test.allNodes, test.pods)
r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker, testDeleteOptions(), false) r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker, testDeleteOptions(), false)
toRemove, unremovable := r.FindNodesToRemove(test.candidates, destinations, time.Now(), []*policyv1.PodDisruptionBudget{}) toRemove, unremovable := r.FindNodesToRemove(test.candidates, destinations, time.Now(), nil)
fmt.Printf("Test scenario: %s, found len(toRemove)=%v, expected len(test.toRemove)=%v\n", test.name, len(toRemove), len(test.toRemove)) fmt.Printf("Test scenario: %s, found len(toRemove)=%v, expected len(test.toRemove)=%v\n", test.name, len(toRemove), len(test.toRemove))
assert.Equal(t, toRemove, test.toRemove) assert.Equal(t, toRemove, test.toRemove)
assert.Equal(t, unremovable, test.unremovable) assert.Equal(t, unremovable, test.unremovable)

View File

@ -25,7 +25,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain" "k8s.io/autoscaler/cluster-autoscaler/utils/drain"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod" pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
@ -44,7 +46,7 @@ type NodeDeleteOptions struct {
// to allow their pods deletion in scale down // to allow their pods deletion in scale down
MinReplicaCount int MinReplicaCount int
// DrainabilityRules contain a list of checks that are used to verify whether a pod can be drained from node. // DrainabilityRules contain a list of checks that are used to verify whether a pod can be drained from node.
DrainabilityRules []drainability.Rule DrainabilityRules rules.Rules
} }
// NewNodeDeleteOptions returns new node delete options extracted from autoscaling options // NewNodeDeleteOptions returns new node delete options extracted from autoscaling options
@ -54,7 +56,7 @@ func NewNodeDeleteOptions(opts config.AutoscalingOptions) NodeDeleteOptions {
SkipNodesWithLocalStorage: opts.SkipNodesWithLocalStorage, SkipNodesWithLocalStorage: opts.SkipNodesWithLocalStorage,
MinReplicaCount: opts.MinReplicaCount, MinReplicaCount: opts.MinReplicaCount,
SkipNodesWithCustomControllerPods: opts.SkipNodesWithCustomControllerPods, SkipNodesWithCustomControllerPods: opts.SkipNodesWithCustomControllerPods,
DrainabilityRules: drainability.DefaultRules(), DrainabilityRules: rules.Default(),
} }
} }
@ -67,16 +69,22 @@ func NewNodeDeleteOptions(opts config.AutoscalingOptions) NodeDeleteOptions {
// still exist. // still exist.
// TODO(x13n): Rewrite GetPodsForDeletionOnNodeDrain into a set of DrainabilityRules. // TODO(x13n): Rewrite GetPodsForDeletionOnNodeDrain into a set of DrainabilityRules.
func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDeleteOptions, listers kube_util.ListerRegistry, func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDeleteOptions, listers kube_util.ListerRegistry,
pdbs []*policyv1.PodDisruptionBudget, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) { remainingPdbTracker pdb.RemainingPdbTracker, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) {
var drainPods, drainDs []*apiv1.Pod var drainPods, drainDs []*apiv1.Pod
drainabilityRules := deleteOptions.DrainabilityRules drainabilityRules := deleteOptions.DrainabilityRules
if drainabilityRules == nil { if drainabilityRules == nil {
drainabilityRules = drainability.DefaultRules() drainabilityRules = rules.Default()
}
if remainingPdbTracker == nil {
remainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
}
drainCtx := &drainability.DrainContext{
RemainingPdbTracker: remainingPdbTracker,
} }
for _, podInfo := range nodeInfo.Pods { for _, podInfo := range nodeInfo.Pods {
pod := podInfo.Pod pod := podInfo.Pod
d := drainabilityStatus(pod, drainabilityRules) status := drainabilityRules.Drainable(drainCtx, pod)
switch d.Outcome { switch status.Outcome {
case drainability.UndefinedOutcome: case drainability.UndefinedOutcome:
pods = append(pods, podInfo.Pod) pods = append(pods, podInfo.Pod)
case drainability.DrainOk: case drainability.DrainOk:
@ -86,15 +94,18 @@ func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDele
drainPods = append(drainPods, pod) drainPods = append(drainPods, pod)
} }
case drainability.BlockDrain: case drainability.BlockDrain:
blockingPod = &drain.BlockingPod{pod, d.BlockingReason} blockingPod = &drain.BlockingPod{
err = d.Error Pod: pod,
Reason: status.BlockingReason,
}
err = status.Error
return return
case drainability.SkipDrain:
} }
} }
pods, daemonSetPods, blockingPod, err = drain.GetPodsForDeletionOnNodeDrain( pods, daemonSetPods, blockingPod, err = drain.GetPodsForDeletionOnNodeDrain(
pods, pods,
pdbs, remainingPdbTracker.GetPdbs(),
deleteOptions.SkipNodesWithSystemPods, deleteOptions.SkipNodesWithSystemPods,
deleteOptions.SkipNodesWithLocalStorage, deleteOptions.SkipNodesWithLocalStorage,
deleteOptions.SkipNodesWithCustomControllerPods, deleteOptions.SkipNodesWithCustomControllerPods,
@ -106,7 +117,7 @@ func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDele
if err != nil { if err != nil {
return pods, daemonSetPods, blockingPod, err return pods, daemonSetPods, blockingPod, err
} }
if pdbBlockingPod, err := checkPdbs(pods, pdbs); err != nil { if pdbBlockingPod, err := checkPdbs(pods, remainingPdbTracker.GetPdbs()); err != nil {
return []*apiv1.Pod{}, []*apiv1.Pod{}, pdbBlockingPod, err return []*apiv1.Pod{}, []*apiv1.Pod{}, pdbBlockingPod, err
} }
@ -131,14 +142,3 @@ func checkPdbs(pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (*drain.
} }
return nil, nil return nil, nil
} }
func drainabilityStatus(pod *apiv1.Pod, dr []drainability.Rule) drainability.Status {
for _, f := range dr {
if d := f.Drainable(pod); d.Outcome != drainability.UndefinedOutcome {
return d
}
}
return drainability.Status{
Outcome: drainability.UndefinedOutcome,
}
}

View File

@ -25,7 +25,9 @@ import (
policyv1 "k8s.io/api/policy/v1" policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain" "k8s.io/autoscaler/cluster-autoscaler/utils/drain"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test" . "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/types"
@ -179,7 +181,7 @@ func TestGetPodsToMove(t *testing.T) {
desc string desc string
pods []*apiv1.Pod pods []*apiv1.Pod
pdbs []*policyv1.PodDisruptionBudget pdbs []*policyv1.PodDisruptionBudget
rules []drainability.Rule rules []rules.Rule
wantPods []*apiv1.Pod wantPods []*apiv1.Pod
wantDs []*apiv1.Pod wantDs []*apiv1.Pod
wantBlocking *drain.BlockingPod wantBlocking *drain.BlockingPod
@ -256,19 +258,19 @@ func TestGetPodsToMove(t *testing.T) {
{ {
desc: "Rule allows", desc: "Rule allows",
pods: []*apiv1.Pod{unreplicatedPod}, pods: []*apiv1.Pod{unreplicatedPod},
rules: []drainability.Rule{alwaysDrain{}}, rules: []rules.Rule{alwaysDrain{}},
wantPods: []*apiv1.Pod{unreplicatedPod}, wantPods: []*apiv1.Pod{unreplicatedPod},
}, },
{ {
desc: "Second rule allows", desc: "Second rule allows",
pods: []*apiv1.Pod{unreplicatedPod}, pods: []*apiv1.Pod{unreplicatedPod},
rules: []drainability.Rule{cantDecide{}, alwaysDrain{}}, rules: []rules.Rule{cantDecide{}, alwaysDrain{}},
wantPods: []*apiv1.Pod{unreplicatedPod}, wantPods: []*apiv1.Pod{unreplicatedPod},
}, },
{ {
desc: "Rule blocks", desc: "Rule blocks",
pods: []*apiv1.Pod{rsPod}, pods: []*apiv1.Pod{rsPod},
rules: []drainability.Rule{neverDrain{}}, rules: []rules.Rule{neverDrain{}},
wantErr: true, wantErr: true,
wantBlocking: &drain.BlockingPod{ wantBlocking: &drain.BlockingPod{
Pod: rsPod, Pod: rsPod,
@ -278,7 +280,7 @@ func TestGetPodsToMove(t *testing.T) {
{ {
desc: "Second rule blocks", desc: "Second rule blocks",
pods: []*apiv1.Pod{rsPod}, pods: []*apiv1.Pod{rsPod},
rules: []drainability.Rule{cantDecide{}, neverDrain{}}, rules: []rules.Rule{cantDecide{}, neverDrain{}},
wantErr: true, wantErr: true,
wantBlocking: &drain.BlockingPod{ wantBlocking: &drain.BlockingPod{
Pod: rsPod, Pod: rsPod,
@ -288,7 +290,7 @@ func TestGetPodsToMove(t *testing.T) {
{ {
desc: "Undecisive rule fallback to default logic: Unreplicated pod", desc: "Undecisive rule fallback to default logic: Unreplicated pod",
pods: []*apiv1.Pod{unreplicatedPod}, pods: []*apiv1.Pod{unreplicatedPod},
rules: []drainability.Rule{cantDecide{}}, rules: []rules.Rule{cantDecide{}},
wantErr: true, wantErr: true,
wantBlocking: &drain.BlockingPod{ wantBlocking: &drain.BlockingPod{
Pod: unreplicatedPod, Pod: unreplicatedPod,
@ -298,7 +300,7 @@ func TestGetPodsToMove(t *testing.T) {
{ {
desc: "Undecisive rule fallback to default logic: Replicated pod", desc: "Undecisive rule fallback to default logic: Replicated pod",
pods: []*apiv1.Pod{rsPod}, pods: []*apiv1.Pod{rsPod},
rules: []drainability.Rule{cantDecide{}}, rules: []rules.Rule{cantDecide{}},
wantPods: []*apiv1.Pod{rsPod}, wantPods: []*apiv1.Pod{rsPod},
}, },
} }
@ -311,7 +313,9 @@ func TestGetPodsToMove(t *testing.T) {
SkipNodesWithCustomControllerPods: true, SkipNodesWithCustomControllerPods: true,
DrainabilityRules: tc.rules, DrainabilityRules: tc.rules,
} }
p, d, b, err := GetPodsToMove(schedulerframework.NewNodeInfo(tc.pods...), deleteOptions, nil, tc.pdbs, testTime) tracker := pdb.NewBasicRemainingPdbTracker()
tracker.SetPdbs(tc.pdbs)
p, d, b, err := GetPodsToMove(schedulerframework.NewNodeInfo(tc.pods...), deleteOptions, nil, tracker, testTime)
if tc.wantErr { if tc.wantErr {
assert.Error(t, err) assert.Error(t, err)
} else { } else {
@ -326,18 +330,18 @@ func TestGetPodsToMove(t *testing.T) {
type alwaysDrain struct{} type alwaysDrain struct{}
func (a alwaysDrain) Drainable(*apiv1.Pod) drainability.Status { func (a alwaysDrain) Drainable(*drainability.DrainContext, *apiv1.Pod) drainability.Status {
return drainability.NewDrainableStatus() return drainability.NewDrainableStatus()
} }
type neverDrain struct{} type neverDrain struct{}
func (n neverDrain) Drainable(*apiv1.Pod) drainability.Status { func (n neverDrain) Drainable(*drainability.DrainContext, *apiv1.Pod) drainability.Status {
return drainability.NewBlockedStatus(drain.UnexpectedError, fmt.Errorf("nope")) return drainability.NewBlockedStatus(drain.UnexpectedError, fmt.Errorf("nope"))
} }
type cantDecide struct{} type cantDecide struct{}
func (c cantDecide) Drainable(*apiv1.Pod) drainability.Status { func (c cantDecide) Drainable(*drainability.DrainContext, *apiv1.Pod) drainability.Status {
return drainability.NewUndefinedStatus() return drainability.NewUndefinedStatus()
} }

View File

@ -0,0 +1,26 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package drainability
import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
)
// DrainContext contains parameters for drainability rules.
type DrainContext struct {
RemainingPdbTracker pdb.RemainingPdbTracker
}

View File

@ -14,26 +14,26 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package drainability package mirror
import ( import (
"k8s.io/autoscaler/cluster-autoscaler/utils/pod"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
) )
// MirrorPodRule is a drainability rule on how to handle mirror pods. // Rule is a drainability rule on how to handle mirror pods.
type MirrorPodRule struct{} type Rule struct{}
// NewMirrorPodRule creates a new MirrorPodRule. // New creates a new Rule.
func NewMirrorPodRule() *MirrorPodRule { func New() *Rule {
return &MirrorPodRule{} return &Rule{}
} }
// Drainable decides what to do with mirror pods on node drain. // Drainable decides what to do with mirror pods on node drain.
func (m *MirrorPodRule) Drainable(p *apiv1.Pod) Status { func (Rule) Drainable(drainCtx *drainability.DrainContext, pod *apiv1.Pod) drainability.Status {
if pod.IsMirrorPod(p) { if pod_util.IsMirrorPod(pod) {
return NewSkipStatus() return drainability.NewSkipStatus()
} }
return NewUndefinedStatus() return drainability.NewUndefinedStatus()
} }

View File

@ -14,21 +14,22 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package drainability package mirror
import ( import (
"testing" "testing"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
"k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/types"
) )
func TestMirrorPodRule(t *testing.T) { func TestRule(t *testing.T) {
testCases := []struct { testCases := []struct {
desc string desc string
pod *apiv1.Pod pod *apiv1.Pod
want Status want drainability.Status
}{ }{
{ {
desc: "non mirror pod", desc: "non mirror pod",
@ -38,7 +39,7 @@ func TestMirrorPodRule(t *testing.T) {
Namespace: "ns", Namespace: "ns",
}, },
}, },
want: NewUndefinedStatus(), want: drainability.NewUndefinedStatus(),
}, },
{ {
desc: "mirror pod", desc: "mirror pod",
@ -51,15 +52,14 @@ func TestMirrorPodRule(t *testing.T) {
}, },
}, },
}, },
want: NewSkipStatus(), want: drainability.NewSkipStatus(),
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
m := NewMirrorPodRule() got := New().Drainable(nil, tc.pod)
got := m.Drainable(tc.pod)
if tc.want != got { if tc.want != got {
t.Errorf("MirrorPodRule.Drainable(%v) = %v, want %v", tc.pod.Name, got, tc.want) t.Errorf("Rule.Drainable(%v) = %v, want %v", tc.pod.Name, got, tc.want)
} }
}) })
} }

View File

@ -0,0 +1,61 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rules
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules/mirror"
)
// Rule determines whether a given pod can be drained or not.
type Rule interface {
// Drainable determines whether a given pod is drainable according to
// the specific Rule.
//
// DrainContext cannot be nil.
Drainable(*drainability.DrainContext, *apiv1.Pod) drainability.Status
}
// Default returns the default list of Rules.
func Default() Rules {
return []Rule{
mirror.New(),
}
}
// Rules defines operations on a collections of rules.
type Rules []Rule
// Drainable determines whether a given pod is drainable according to the
// specified set of rules.
func (rs Rules) Drainable(drainCtx *drainability.DrainContext, pod *apiv1.Pod) drainability.Status {
if drainCtx == nil {
drainCtx = &drainability.DrainContext{}
}
if drainCtx.RemainingPdbTracker == nil {
drainCtx.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
}
for _, r := range rs {
if d := r.Drainable(drainCtx, pod); d.Outcome != drainability.UndefinedOutcome {
return d
}
}
return drainability.NewUndefinedStatus()
}

View File

@ -18,8 +18,6 @@ package drainability
import ( import (
"k8s.io/autoscaler/cluster-autoscaler/utils/drain" "k8s.io/autoscaler/cluster-autoscaler/utils/drain"
apiv1 "k8s.io/api/core/v1"
) )
// OutcomeType identifies the action that should be taken when it comes to // OutcomeType identifies the action that should be taken when it comes to
@ -79,17 +77,3 @@ func NewSkipStatus() Status {
func NewUndefinedStatus() Status { func NewUndefinedStatus() Status {
return Status{} return Status{}
} }
// Rule determines whether a given pod can be drained or not.
type Rule interface {
// Drainable determines whether a given pod is drainable according to
// the specific Rule.
Drainable(*apiv1.Pod) Status
}
// DefaultRules returns the default list of Rules.
func DefaultRules() []Rule {
return []Rule{
NewMirrorPodRule(),
}
}