kubectl/drain: add option skip-wait-for-delete-timeout

Currently, some circumstances may cause waitForDelete to
never succeed after the pod has been marked for deletion.
In particular, Nodes that are unresponsive and have
pods with local-storage will not be able to
successfully drain.

We should allow drain to ignore pods that have a
DeletionTimestamp older than a user-provided age.
This will allow controllers utilizing kubectl/drain
to optionally account for a pod that cannot be
removed due to a misbehaving node.

Kubernetes-commit: da53044abdf8c8a9771a5c3dfd861f0c4ec78c40
This commit is contained in:
Michael Gugino 2019-11-23 20:00:06 -05:00 committed by Kubernetes Publisher
parent b909fcb4a0
commit ebda9f6262
5 changed files with 185 additions and 12 deletions

View File

@ -193,6 +193,7 @@ func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobr
cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on")
cmd.Flags().StringVarP(&o.drainer.PodSelector, "pod-selector", "", o.drainer.PodSelector, "Label selector to filter pods on the node")
cmd.Flags().BoolVar(&o.drainer.DisableEviction, "disable-eviction", o.drainer.DisableEviction, "Force drain to use delete, even if eviction is supported. This will bypass checking PodDisruptionBudgets, use with caution.")
cmd.Flags().IntVar(&o.drainer.SkipWaitForDeleteTimeoutSeconds, "skip-wait-for-delete-timeout", o.drainer.SkipWaitForDeleteTimeoutSeconds, "If pod DeletionTimestamp older than N seconds, skip waiting for the pod. Seconds must be greater than 0 to skip.")
cmdutil.AddDryRunFlag(cmd)
return cmd

View File

@ -39,6 +39,7 @@ const (
EvictionKind = "Eviction"
// EvictionSubresource represents the kind of evictions object as pod's subresource
EvictionSubresource = "pods/eviction"
podSkipMsgTemplate = "pod %q has DeletionTimestamp older than %v seconds, skipping\n"
)
// Helper contains the parameters to control the behaviour of drainer
@ -56,6 +57,12 @@ type Helper struct {
// DisableEviction forces drain to use delete rather than evict
DisableEviction bool
// SkipWaitForDeleteTimeoutSeconds ignores pods that have a
// DeletionTimeStamp > N seconds. It's up to the user to decide when this
// option is appropriate; examples include the Node is unready and the pods
// won't drain otherwise
SkipWaitForDeleteTimeoutSeconds int
Out io.Writer
ErrOut io.Writer
@ -66,6 +73,19 @@ type Helper struct {
OnPodDeletedOrEvicted func(pod *corev1.Pod, usingEviction bool)
}
type waitForDeleteParams struct {
ctx context.Context
pods []corev1.Pod
interval time.Duration
timeout time.Duration
usingEviction bool
getPodFn func(string, string) (*corev1.Pod, error)
onDoneFn func(pod *corev1.Pod, usingEviction bool)
globalTimeout time.Duration
skipWaitForDeleteTimeoutSeconds int
out io.Writer
}
// CheckEvictionSupport uses Discovery API to find out if the server support
// eviction subresource If support, it will return its groupVersion; Otherwise,
// it will return an empty string
@ -238,7 +258,19 @@ func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodF
return
}
}
_, err := waitForDelete(ctx, []corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted, globalTimeout)
params := waitForDeleteParams{
ctx: ctx,
pods: []corev1.Pod{pod},
interval: 1 * time.Second,
timeout: time.Duration(math.MaxInt64),
usingEviction: true,
getPodFn: getPodFn,
onDoneFn: d.OnPodDeletedOrEvicted,
globalTimeout: globalTimeout,
skipWaitForDeleteTimeoutSeconds: d.SkipWaitForDeleteTimeoutSeconds,
out: d.Out,
}
_, err := waitForDelete(params)
if err == nil {
returnCh <- nil
} else {
@ -280,31 +312,48 @@ func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name str
}
}
ctx := d.getContext()
_, err := waitForDelete(ctx, pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted, globalTimeout)
params := waitForDeleteParams{
ctx: ctx,
pods: pods,
interval: 1 * time.Second,
timeout: globalTimeout,
usingEviction: false,
getPodFn: getPodFn,
onDoneFn: d.OnPodDeletedOrEvicted,
globalTimeout: globalTimeout,
skipWaitForDeleteTimeoutSeconds: d.SkipWaitForDeleteTimeoutSeconds,
out: d.Out,
}
_, err := waitForDelete(params)
return err
}
func waitForDelete(ctx context.Context, pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error), onDoneFn func(pod *corev1.Pod, usingEviction bool), globalTimeout time.Duration) ([]corev1.Pod, error) {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
func waitForDelete(params waitForDeleteParams) ([]corev1.Pod, error) {
pods := params.pods
err := wait.PollImmediate(params.interval, params.timeout, func() (bool, error) {
pendingPods := []corev1.Pod{}
for i, pod := range pods {
p, err := getPodFn(pod.Namespace, pod.Name)
p, err := params.getPodFn(pod.Namespace, pod.Name)
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
if onDoneFn != nil {
onDoneFn(&pod, usingEviction)
if params.onDoneFn != nil {
params.onDoneFn(&pod, params.usingEviction)
}
continue
} else if err != nil {
return false, err
} else {
if shouldSkipPod(*p, params.skipWaitForDeleteTimeoutSeconds) {
fmt.Fprintf(params.out, podSkipMsgTemplate, pod.Name, params.skipWaitForDeleteTimeoutSeconds)
continue
}
pendingPods = append(pendingPods, pods[i])
}
}
pods = pendingPods
if len(pendingPods) > 0 {
select {
case <-ctx.Done():
return false, fmt.Errorf("global timeout reached: %v", globalTimeout)
case <-params.ctx.Done():
return false, fmt.Errorf("global timeout reached: %v", params.globalTimeout)
default:
return false, nil
}

View File

@ -108,6 +108,23 @@ func TestDeletePods(t *testing.T) {
return nil, fmt.Errorf("%q: not found", name)
},
},
{
description: "Skip Deleted Pod",
interval: 200 * time.Millisecond,
timeout: 3 * time.Second,
expectPendingPods: false,
expectError: false,
expectedError: nil,
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
oldPodMap, _ := createPods(false)
if oldPod, found := oldPodMap[name]; found {
dTime := &metav1.Time{Time: time.Now().Add(time.Duration(100) * time.Second * -1)}
oldPod.ObjectMeta.SetDeletionTimestamp(dTime)
return &oldPod, nil
}
return nil, fmt.Errorf("%q: not found", name)
},
},
{
description: "Client error could be passed out",
interval: 200 * time.Millisecond,
@ -124,13 +141,29 @@ func TestDeletePods(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
_, pods := createPods(false)
ctx := context.Background()
var ctx context.Context
var cancel context.CancelFunc
ctx = context.Background()
if test.ctxTimeoutEarly {
ctx, _ = context.WithTimeout(ctx, 100*time.Millisecond)
ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
}
params := waitForDeleteParams{
ctx: ctx,
pods: pods,
interval: test.interval,
timeout: test.timeout,
usingEviction: false,
getPodFn: test.getPodFn,
onDoneFn: nil,
globalTimeout: time.Duration(math.MaxInt64),
out: os.Stdout,
skipWaitForDeleteTimeoutSeconds: 10,
}
start := time.Now()
pendingPods, err := waitForDelete(ctx, pods, test.interval, test.timeout, false, test.getPodFn, nil, time.Duration(math.MaxInt64))
pendingPods, err := waitForDelete(params)
elapsed := time.Since(start)
if test.expectError {
if err == nil {
t.Fatalf("%s: unexpected non-error", test.description)

70
pkg/drain/filter_test.go Normal file
View File

@ -0,0 +1,70 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package drain
import (
"testing"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestSkipDeletedFilter(t *testing.T) {
tCases := []struct {
timeStampAgeSeconds int
skipWaitForDeleteTimeoutSeconds int
expectedDelete bool
}{
{
timeStampAgeSeconds: 0,
skipWaitForDeleteTimeoutSeconds: 20,
expectedDelete: true,
},
{
timeStampAgeSeconds: 1,
skipWaitForDeleteTimeoutSeconds: 20,
expectedDelete: true,
},
{
timeStampAgeSeconds: 100,
skipWaitForDeleteTimeoutSeconds: 20,
expectedDelete: false,
},
}
for i, tc := range tCases {
h := &Helper{
SkipWaitForDeleteTimeoutSeconds: tc.skipWaitForDeleteTimeoutSeconds,
}
pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
Namespace: "default",
},
}
if tc.timeStampAgeSeconds > 0 {
dTime := &metav1.Time{Time: time.Now().Add(time.Duration(tc.timeStampAgeSeconds) * time.Second * -1)}
pod.ObjectMeta.SetDeletionTimestamp(dTime)
}
podDeleteStatus := h.skipDeletedFilter(pod)
if podDeleteStatus.delete != tc.expectedDelete {
t.Errorf("test %v: unexpected podDeleteStatus.delete; actual %v; expected %v", i, podDeleteStatus.delete, tc.expectedDelete)
}
}
}

View File

@ -19,6 +19,7 @@ package drain
import (
"fmt"
"strings"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@ -133,8 +134,11 @@ func makePodDeleteStatusWithError(message string) podDeleteStatus {
}
}
// The filters are applied in a specific order, only the last filter's
// message will be retained if there are any warnings.
func (d *Helper) makeFilters() []podFilter {
return []podFilter{
d.skipDeletedFilter,
d.daemonSetFilter,
d.mirrorPodFilter,
d.localStorageFilter,
@ -203,6 +207,9 @@ func (d *Helper) localStorageFilter(pod corev1.Pod) podDeleteStatus {
return makePodDeleteStatusWithError(localStorageFatal)
}
// TODO: this warning gets dropped by subsequent filters;
// consider accounting for multiple warning conditions or at least
// preserving the last warning message.
return makePodDeleteStatusWithWarning(true, localStorageWarning)
}
@ -221,3 +228,16 @@ func (d *Helper) unreplicatedFilter(pod corev1.Pod) podDeleteStatus {
}
return makePodDeleteStatusWithError(unmanagedFatal)
}
func shouldSkipPod(pod corev1.Pod, skipDeletedTimeoutSeconds int) bool {
return skipDeletedTimeoutSeconds > 0 &&
!pod.ObjectMeta.DeletionTimestamp.IsZero() &&
int(time.Now().Sub(pod.ObjectMeta.GetDeletionTimestamp().Time).Seconds()) > skipDeletedTimeoutSeconds
}
func (d *Helper) skipDeletedFilter(pod corev1.Pod) podDeleteStatus {
if shouldSkipPod(pod, d.SkipWaitForDeleteTimeoutSeconds) {
return makePodDeleteStatusSkip()
}
return makePodDeleteStatusOkay()
}