Allow custom checks on whether or not a pod can be drained.

This commit is contained in:
Daniel Kłobuszewski 2023-04-07 21:24:18 +02:00
parent 88bc588ce2
commit 1df765754e
3 changed files with 288 additions and 104 deletions

View File

@ -25,8 +25,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
@ -41,6 +43,8 @@ type NodeDeleteOptions struct {
// MinReplicaCount controls the minimum number of replicas that a replica set or replication controller should have
// to allow their pods deletion in scale down
MinReplicaCount int
// DrainabilityRules contain a list of checks that are used to verify whether a pod can be drained from node.
DrainabilityRules []drainability.Rule
}
// NewNodeDeleteOptions returns new node delete options extracted from autoscaling options
@ -60,9 +64,28 @@ func NewNodeDeleteOptions(opts config.AutoscalingOptions) NodeDeleteOptions {
// along with their pods (no abandoned pods with dangling created-by annotation).
// If listers is not nil it checks whether RC, DS, Jobs and RS that created these pods
// still exist.
// TODO(x13n): Rewrite GetPodsForDeletionOnNodeDrain into a set of DrainabilityRules.
func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDeleteOptions, listers kube_util.ListerRegistry,
pdbs []*policyv1.PodDisruptionBudget, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) {
var drainPods, drainDs []*apiv1.Pod
for _, podInfo := range nodeInfo.Pods {
pod := podInfo.Pod
d := drainabilityStatus(pod, deleteOptions.DrainabilityRules)
if d.Matched {
switch d.Reason {
case drain.NoReason:
if pod_util.IsDaemonSetPod(pod) {
drainDs = append(drainDs, pod)
} else {
drainPods = append(drainPods, pod)
}
continue
default:
blockingPod = &drain.BlockingPod{pod, d.Reason}
err = d.Error
return
}
}
pods = append(pods, podInfo.Pod)
}
pods, daemonSetPods, blockingPod, err = drain.GetPodsForDeletionOnNodeDrain(
@ -74,6 +97,8 @@ func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDele
listers,
int32(deleteOptions.MinReplicaCount),
timestamp)
pods = append(pods, drainPods...)
daemonSetPods = append(daemonSetPods, drainDs...)
if err != nil {
return pods, daemonSetPods, blockingPod, err
}
@ -102,3 +127,12 @@ func checkPdbs(pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (*drain.
}
return nil, nil
}
func drainabilityStatus(pod *apiv1.Pod, dr []drainability.Rule) drainability.Status {
for _, f := range dr {
if d := f.Drainable(pod); d.Matched {
return d
}
}
return drainability.Status{}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package simulator
import (
"fmt"
"testing"
"time"
@ -24,6 +25,7 @@ import (
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/kubernetes/pkg/kubelet/types"
@ -34,82 +36,45 @@ import (
func TestGetPodsToMove(t *testing.T) {
testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC)
// Unreplicated pod
pod1 := &apiv1.Pod{
unreplicatedPod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Name: "unreplicatedPod",
Namespace: "ns",
},
}
deleteOptions := NodeDeleteOptions{
SkipNodesWithSystemPods: true,
SkipNodesWithLocalStorage: true,
MinReplicaCount: 0,
SkipNodesWithCustomControllerPods: true,
}
_, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod1), deleteOptions, nil, nil, testTime)
assert.Error(t, err)
assert.Equal(t, &drain.BlockingPod{Pod: pod1, Reason: drain.NotReplicated}, blockingPod)
// Replicated pod
pod2 := &apiv1.Pod{
rsPod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Name: "rsPod",
Namespace: "ns",
OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""),
},
}
r2, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod2), deleteOptions, nil, nil, testTime)
assert.NoError(t, err)
assert.Nil(t, blockingPod)
assert.Equal(t, 1, len(r2))
assert.Equal(t, pod2, r2[0])
// Manifest pod
pod3 := &apiv1.Pod{
manifestPod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod3",
Name: "manifestPod",
Namespace: "kube-system",
Annotations: map[string]string{
types.ConfigMirrorAnnotationKey: "something",
},
},
}
r3, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod3), deleteOptions, nil, nil, testTime)
assert.NoError(t, err)
assert.Nil(t, blockingPod)
assert.Equal(t, 0, len(r3))
// DaemonSet pod
pod4 := &apiv1.Pod{
dsPod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod4",
Name: "dsPod",
Namespace: "ns",
OwnerReferences: GenerateOwnerReferences("ds", "DaemonSet", "extensions/v1beta1", ""),
},
}
r4, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod2, pod3, pod4), deleteOptions, nil, nil, testTime)
assert.NoError(t, err)
assert.Nil(t, blockingPod)
assert.Equal(t, 1, len(r4))
assert.Equal(t, pod2, r4[0])
// Kube-system
pod5 := &apiv1.Pod{
systemPod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod5",
Name: "systemPod",
Namespace: "kube-system",
OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""),
},
}
_, _, blockingPod, err = GetPodsToMove(schedulerframework.NewNodeInfo(pod5), deleteOptions, nil, nil, testTime)
assert.Error(t, err)
assert.Equal(t, &drain.BlockingPod{Pod: pod5, Reason: drain.UnmovableKubeSystemPod}, blockingPod)
// Local storage
pod6 := &apiv1.Pod{
localStoragePod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod6",
Name: "localStoragePod",
Namespace: "ns",
OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""),
},
@ -123,14 +88,9 @@ func TestGetPodsToMove(t *testing.T) {
},
},
}
_, _, blockingPod, err = GetPodsToMove(schedulerframework.NewNodeInfo(pod6), deleteOptions, nil, nil, testTime)
assert.Error(t, err)
assert.Equal(t, &drain.BlockingPod{Pod: pod6, Reason: drain.LocalStorageRequested}, blockingPod)
// Non-local storage
pod7 := &apiv1.Pod{
nonLocalStoragePod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod7",
Name: "nonLocalStoragePod",
Namespace: "ns",
OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""),
},
@ -146,15 +106,9 @@ func TestGetPodsToMove(t *testing.T) {
},
},
}
r7, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod7), deleteOptions, nil, nil, testTime)
assert.NoError(t, err)
assert.Nil(t, blockingPod)
assert.Equal(t, 1, len(r7))
// Pdb blocking
pod8 := &apiv1.Pod{
pdbPod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod8",
Name: "pdbPod",
Namespace: "ns",
OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""),
Labels: map[string]string{
@ -164,7 +118,7 @@ func TestGetPodsToMove(t *testing.T) {
Spec: apiv1.PodSpec{},
}
one := intstr.FromInt(1)
pdb8 := &policyv1.PodDisruptionBudget{
restrictivePdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: "ns",
@ -181,24 +135,7 @@ func TestGetPodsToMove(t *testing.T) {
DisruptionsAllowed: 0,
},
}
_, _, blockingPod, err = GetPodsToMove(schedulerframework.NewNodeInfo(pod8), deleteOptions, nil, []*policyv1.PodDisruptionBudget{pdb8}, testTime)
assert.Error(t, err)
assert.Equal(t, &drain.BlockingPod{Pod: pod8, Reason: drain.NotEnoughPdb}, blockingPod)
// Pdb allowing
pod9 := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod9",
Namespace: "ns",
OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""),
Labels: map[string]string{
"critical": "true",
},
},
Spec: apiv1.PodSpec{},
}
pdb9 := &policyv1.PodDisruptionBudget{
permissivePdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: "ns",
@ -215,22 +152,9 @@ func TestGetPodsToMove(t *testing.T) {
DisruptionsAllowed: 1,
},
}
r9, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod9), deleteOptions, nil, []*policyv1.PodDisruptionBudget{pdb9}, testTime)
assert.NoError(t, err)
assert.Nil(t, blockingPod)
assert.Equal(t, 1, len(r9))
pod10 := &apiv1.Pod{
terminatedPod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod10",
Namespace: "ns",
OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""),
},
}
pod10Terminated := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod10Terminated",
Name: "terminatedPod",
Namespace: "ns",
OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""),
DeletionTimestamp: &metav1.Time{
@ -238,9 +162,9 @@ func TestGetPodsToMove(t *testing.T) {
},
},
}
pod10Terminating := &apiv1.Pod{
terminatingPod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod10Terminating",
Name: "terminatingPod",
Namespace: "ns",
OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""),
DeletionTimestamp: &metav1.Time{
@ -249,8 +173,169 @@ func TestGetPodsToMove(t *testing.T) {
},
}
r10SkipPodsThatShouldBeTerminatedTrue, _, blockingPod, err := GetPodsToMove(schedulerframework.NewNodeInfo(pod10, pod10Terminated, pod10Terminating), deleteOptions, nil, nil, testTime)
assert.NoError(t, err)
assert.Nil(t, blockingPod)
assert.ElementsMatch(t, []*apiv1.Pod{pod10, pod10Terminating}, r10SkipPodsThatShouldBeTerminatedTrue)
testCases := []struct {
desc string
pods []*apiv1.Pod
pdbs []*policyv1.PodDisruptionBudget
rules []drainability.Rule
wantPods []*apiv1.Pod
wantDs []*apiv1.Pod
wantBlocking *drain.BlockingPod
wantErr bool
}{
{
desc: "Unreplicated pod",
pods: []*apiv1.Pod{unreplicatedPod},
wantErr: true,
wantBlocking: &drain.BlockingPod{
Pod: unreplicatedPod,
Reason: drain.NotReplicated,
},
},
{
desc: "Replicated pod",
pods: []*apiv1.Pod{rsPod},
wantPods: []*apiv1.Pod{rsPod},
},
{
desc: "Manifest pod",
pods: []*apiv1.Pod{manifestPod},
},
{
desc: "DaemonSet pod",
pods: []*apiv1.Pod{rsPod, manifestPod, dsPod},
wantPods: []*apiv1.Pod{rsPod},
wantDs: []*apiv1.Pod{dsPod},
},
{
desc: "Kube-system",
pods: []*apiv1.Pod{systemPod},
wantErr: true,
wantBlocking: &drain.BlockingPod{
Pod: systemPod,
Reason: drain.UnmovableKubeSystemPod,
},
},
{
desc: "Local storage",
pods: []*apiv1.Pod{localStoragePod},
wantErr: true,
wantBlocking: &drain.BlockingPod{
Pod: localStoragePod,
Reason: drain.LocalStorageRequested,
},
},
{
desc: "Non-local storage",
pods: []*apiv1.Pod{nonLocalStoragePod},
wantPods: []*apiv1.Pod{nonLocalStoragePod},
},
{
desc: "Pdb blocking",
pods: []*apiv1.Pod{pdbPod},
pdbs: []*policyv1.PodDisruptionBudget{restrictivePdb},
wantErr: true,
wantBlocking: &drain.BlockingPod{
Pod: pdbPod,
Reason: drain.NotEnoughPdb,
},
},
{
desc: "Pdb allowing",
pods: []*apiv1.Pod{pdbPod},
pdbs: []*policyv1.PodDisruptionBudget{permissivePdb},
wantPods: []*apiv1.Pod{pdbPod},
},
{
desc: "Pod termination",
pods: []*apiv1.Pod{rsPod, terminatedPod, terminatingPod},
wantPods: []*apiv1.Pod{rsPod, terminatingPod},
},
{
desc: "Rule allows",
pods: []*apiv1.Pod{unreplicatedPod},
rules: []drainability.Rule{alwaysDrain{}},
wantPods: []*apiv1.Pod{unreplicatedPod},
},
{
desc: "Second rule allows",
pods: []*apiv1.Pod{unreplicatedPod},
rules: []drainability.Rule{cantDecide{}, alwaysDrain{}},
wantPods: []*apiv1.Pod{unreplicatedPod},
},
{
desc: "Rule blocks",
pods: []*apiv1.Pod{rsPod},
rules: []drainability.Rule{neverDrain{}},
wantErr: true,
wantBlocking: &drain.BlockingPod{
Pod: rsPod,
Reason: drain.UnexpectedError,
},
},
{
desc: "Second rule blocks",
pods: []*apiv1.Pod{rsPod},
rules: []drainability.Rule{cantDecide{}, neverDrain{}},
wantErr: true,
wantBlocking: &drain.BlockingPod{
Pod: rsPod,
Reason: drain.UnexpectedError,
},
},
{
desc: "Undecisive rule fallback to default logic: Unreplicated pod",
pods: []*apiv1.Pod{unreplicatedPod},
rules: []drainability.Rule{cantDecide{}},
wantErr: true,
wantBlocking: &drain.BlockingPod{
Pod: unreplicatedPod,
Reason: drain.NotReplicated,
},
},
{
desc: "Undecisive rule fallback to default logic: Replicated pod",
pods: []*apiv1.Pod{rsPod},
rules: []drainability.Rule{cantDecide{}},
wantPods: []*apiv1.Pod{rsPod},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
deleteOptions := NodeDeleteOptions{
SkipNodesWithSystemPods: true,
SkipNodesWithLocalStorage: true,
MinReplicaCount: 0,
SkipNodesWithCustomControllerPods: true,
DrainabilityRules: tc.rules,
}
p, d, b, err := GetPodsToMove(schedulerframework.NewNodeInfo(tc.pods...), deleteOptions, nil, tc.pdbs, testTime)
if tc.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.ElementsMatch(t, tc.wantPods, p)
assert.ElementsMatch(t, tc.wantDs, d)
assert.Equal(t, tc.wantBlocking, b)
})
}
}
type alwaysDrain struct{}
func (a alwaysDrain) Drainable(*apiv1.Pod) drainability.Status {
return drainability.NewDrainableStatus()
}
type neverDrain struct{}
func (n neverDrain) Drainable(*apiv1.Pod) drainability.Status {
return drainability.NewBlockedStatus(drain.UnexpectedError, fmt.Errorf("nope"))
}
type cantDecide struct{}
func (c cantDecide) Drainable(*apiv1.Pod) drainability.Status {
return drainability.NewUnmatchedStatus()
}

View File

@ -0,0 +1,65 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package drainability
import (
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
apiv1 "k8s.io/api/core/v1"
)
// Status indicates whether a pod can be drained, with an optional error message when not.
// TODO(x13n): Move values from drain.BlockingPodReason to some typed string.
type Status struct {
// Matched indicates whether the Rule can be applied to a given pod.
// `false` indicates that the Rule doesn't match and that another one
// has to be applied.
Matched bool
// Reason contains the decision whether to drain the pod or not.
Reason drain.BlockingPodReason
// Error contains an optional error message.
Error error
}
// NewDrainableStatus returns a new Status indicating that a pod can be drained.
func NewDrainableStatus() Status {
return Status{
Matched: true,
Reason: drain.NoReason,
}
}
// NewBlockedStatus returns a new Status indicating that a pod is blocked and cannot be drained.
func NewBlockedStatus(reason drain.BlockingPodReason, err error) Status {
return Status{
Matched: true,
Reason: reason,
Error: err,
}
}
// NewUnmatchedStatus returns a new Status that doesn't contain a decision.
func NewUnmatchedStatus() Status {
return Status{}
}
// Rule determines whether a given pod can be drained or not.
type Rule interface {
// Drainable determines whether a given pod is drainable according to
// the specific Rule.
Drainable(*apiv1.Pod) Status
}