diff --git a/pkg/cmd/drain/drain.go b/pkg/cmd/drain/drain.go index 9d281031..a0d8f5bc 100644 --- a/pkg/cmd/drain/drain.go +++ b/pkg/cmd/drain/drain.go @@ -193,6 +193,7 @@ func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobr cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on") cmd.Flags().StringVarP(&o.drainer.PodSelector, "pod-selector", "", o.drainer.PodSelector, "Label selector to filter pods on the node") cmd.Flags().BoolVar(&o.drainer.DisableEviction, "disable-eviction", o.drainer.DisableEviction, "Force drain to use delete, even if eviction is supported. This will bypass checking PodDisruptionBudgets, use with caution.") + cmd.Flags().IntVar(&o.drainer.SkipWaitForDeleteTimeoutSeconds, "skip-wait-for-delete-timeout", o.drainer.SkipWaitForDeleteTimeoutSeconds, "If pod DeletionTimestamp older than N seconds, skip waiting for the pod. Seconds must be greater than 0 to skip.") cmdutil.AddDryRunFlag(cmd) return cmd diff --git a/pkg/drain/drain.go b/pkg/drain/drain.go index d846dd0f..8992a3ae 100644 --- a/pkg/drain/drain.go +++ b/pkg/drain/drain.go @@ -39,6 +39,7 @@ const ( EvictionKind = "Eviction" // EvictionSubresource represents the kind of evictions object as pod's subresource EvictionSubresource = "pods/eviction" + podSkipMsgTemplate = "pod %q has DeletionTimestamp older than %v seconds, skipping\n" ) // Helper contains the parameters to control the behaviour of drainer @@ -56,6 +57,12 @@ type Helper struct { // DisableEviction forces drain to use delete rather than evict DisableEviction bool + // SkipWaitForDeleteTimeoutSeconds ignores pods that have a + // DeletionTimeStamp > N seconds. It's up to the user to decide when this + // option is appropriate; examples include the Node is unready and the pods + // won't drain otherwise + SkipWaitForDeleteTimeoutSeconds int + Out io.Writer ErrOut io.Writer @@ -66,6 +73,19 @@ type Helper struct { OnPodDeletedOrEvicted func(pod *corev1.Pod, usingEviction bool) } +type waitForDeleteParams struct { + ctx context.Context + pods []corev1.Pod + interval time.Duration + timeout time.Duration + usingEviction bool + getPodFn func(string, string) (*corev1.Pod, error) + onDoneFn func(pod *corev1.Pod, usingEviction bool) + globalTimeout time.Duration + skipWaitForDeleteTimeoutSeconds int + out io.Writer +} + // CheckEvictionSupport uses Discovery API to find out if the server support // eviction subresource If support, it will return its groupVersion; Otherwise, // it will return an empty string @@ -238,7 +258,19 @@ func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodF return } } - _, err := waitForDelete(ctx, []corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted, globalTimeout) + params := waitForDeleteParams{ + ctx: ctx, + pods: []corev1.Pod{pod}, + interval: 1 * time.Second, + timeout: time.Duration(math.MaxInt64), + usingEviction: true, + getPodFn: getPodFn, + onDoneFn: d.OnPodDeletedOrEvicted, + globalTimeout: globalTimeout, + skipWaitForDeleteTimeoutSeconds: d.SkipWaitForDeleteTimeoutSeconds, + out: d.Out, + } + _, err := waitForDelete(params) if err == nil { returnCh <- nil } else { @@ -280,31 +312,48 @@ func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name str } } ctx := d.getContext() - _, err := waitForDelete(ctx, pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted, globalTimeout) + params := waitForDeleteParams{ + ctx: ctx, + pods: pods, + interval: 1 * time.Second, + timeout: globalTimeout, + usingEviction: false, + getPodFn: getPodFn, + onDoneFn: d.OnPodDeletedOrEvicted, + globalTimeout: globalTimeout, + skipWaitForDeleteTimeoutSeconds: d.SkipWaitForDeleteTimeoutSeconds, + out: d.Out, + } + _, err := waitForDelete(params) return err } -func waitForDelete(ctx context.Context, pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error), onDoneFn func(pod *corev1.Pod, usingEviction bool), globalTimeout time.Duration) ([]corev1.Pod, error) { - err := wait.PollImmediate(interval, timeout, func() (bool, error) { +func waitForDelete(params waitForDeleteParams) ([]corev1.Pod, error) { + pods := params.pods + err := wait.PollImmediate(params.interval, params.timeout, func() (bool, error) { pendingPods := []corev1.Pod{} for i, pod := range pods { - p, err := getPodFn(pod.Namespace, pod.Name) + p, err := params.getPodFn(pod.Namespace, pod.Name) if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { - if onDoneFn != nil { - onDoneFn(&pod, usingEviction) + if params.onDoneFn != nil { + params.onDoneFn(&pod, params.usingEviction) } continue } else if err != nil { return false, err } else { + if shouldSkipPod(*p, params.skipWaitForDeleteTimeoutSeconds) { + fmt.Fprintf(params.out, podSkipMsgTemplate, pod.Name, params.skipWaitForDeleteTimeoutSeconds) + continue + } pendingPods = append(pendingPods, pods[i]) } } pods = pendingPods if len(pendingPods) > 0 { select { - case <-ctx.Done(): - return false, fmt.Errorf("global timeout reached: %v", globalTimeout) + case <-params.ctx.Done(): + return false, fmt.Errorf("global timeout reached: %v", params.globalTimeout) default: return false, nil } diff --git a/pkg/drain/drain_test.go b/pkg/drain/drain_test.go index 75e86342..a63c3c8c 100644 --- a/pkg/drain/drain_test.go +++ b/pkg/drain/drain_test.go @@ -108,6 +108,23 @@ func TestDeletePods(t *testing.T) { return nil, fmt.Errorf("%q: not found", name) }, }, + { + description: "Skip Deleted Pod", + interval: 200 * time.Millisecond, + timeout: 3 * time.Second, + expectPendingPods: false, + expectError: false, + expectedError: nil, + getPodFn: func(namespace, name string) (*corev1.Pod, error) { + oldPodMap, _ := createPods(false) + if oldPod, found := oldPodMap[name]; found { + dTime := &metav1.Time{Time: time.Now().Add(time.Duration(100) * time.Second * -1)} + oldPod.ObjectMeta.SetDeletionTimestamp(dTime) + return &oldPod, nil + } + return nil, fmt.Errorf("%q: not found", name) + }, + }, { description: "Client error could be passed out", interval: 200 * time.Millisecond, @@ -124,13 +141,29 @@ func TestDeletePods(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { _, pods := createPods(false) - ctx := context.Background() + var ctx context.Context + var cancel context.CancelFunc + ctx = context.Background() if test.ctxTimeoutEarly { - ctx, _ = context.WithTimeout(ctx, 100*time.Millisecond) + ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + } + params := waitForDeleteParams{ + ctx: ctx, + pods: pods, + interval: test.interval, + timeout: test.timeout, + usingEviction: false, + getPodFn: test.getPodFn, + onDoneFn: nil, + globalTimeout: time.Duration(math.MaxInt64), + out: os.Stdout, + skipWaitForDeleteTimeoutSeconds: 10, } start := time.Now() - pendingPods, err := waitForDelete(ctx, pods, test.interval, test.timeout, false, test.getPodFn, nil, time.Duration(math.MaxInt64)) + pendingPods, err := waitForDelete(params) elapsed := time.Since(start) + if test.expectError { if err == nil { t.Fatalf("%s: unexpected non-error", test.description) diff --git a/pkg/drain/filter_test.go b/pkg/drain/filter_test.go new file mode 100644 index 00000000..32d66c46 --- /dev/null +++ b/pkg/drain/filter_test.go @@ -0,0 +1,70 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestSkipDeletedFilter(t *testing.T) { + tCases := []struct { + timeStampAgeSeconds int + skipWaitForDeleteTimeoutSeconds int + expectedDelete bool + }{ + { + timeStampAgeSeconds: 0, + skipWaitForDeleteTimeoutSeconds: 20, + expectedDelete: true, + }, + { + timeStampAgeSeconds: 1, + skipWaitForDeleteTimeoutSeconds: 20, + expectedDelete: true, + }, + { + timeStampAgeSeconds: 100, + skipWaitForDeleteTimeoutSeconds: 20, + expectedDelete: false, + }, + } + for i, tc := range tCases { + h := &Helper{ + SkipWaitForDeleteTimeoutSeconds: tc.skipWaitForDeleteTimeoutSeconds, + } + pod := corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }, + } + + if tc.timeStampAgeSeconds > 0 { + dTime := &metav1.Time{Time: time.Now().Add(time.Duration(tc.timeStampAgeSeconds) * time.Second * -1)} + pod.ObjectMeta.SetDeletionTimestamp(dTime) + } + + podDeleteStatus := h.skipDeletedFilter(pod) + if podDeleteStatus.delete != tc.expectedDelete { + t.Errorf("test %v: unexpected podDeleteStatus.delete; actual %v; expected %v", i, podDeleteStatus.delete, tc.expectedDelete) + } + } +} diff --git a/pkg/drain/filters.go b/pkg/drain/filters.go index 2cbba245..29e3c21e 100644 --- a/pkg/drain/filters.go +++ b/pkg/drain/filters.go @@ -19,6 +19,7 @@ package drain import ( "fmt" "strings" + "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -133,8 +134,11 @@ func makePodDeleteStatusWithError(message string) podDeleteStatus { } } +// The filters are applied in a specific order, only the last filter's +// message will be retained if there are any warnings. func (d *Helper) makeFilters() []podFilter { return []podFilter{ + d.skipDeletedFilter, d.daemonSetFilter, d.mirrorPodFilter, d.localStorageFilter, @@ -203,6 +207,9 @@ func (d *Helper) localStorageFilter(pod corev1.Pod) podDeleteStatus { return makePodDeleteStatusWithError(localStorageFatal) } + // TODO: this warning gets dropped by subsequent filters; + // consider accounting for multiple warning conditions or at least + // preserving the last warning message. return makePodDeleteStatusWithWarning(true, localStorageWarning) } @@ -221,3 +228,16 @@ func (d *Helper) unreplicatedFilter(pod corev1.Pod) podDeleteStatus { } return makePodDeleteStatusWithError(unmanagedFatal) } + +func shouldSkipPod(pod corev1.Pod, skipDeletedTimeoutSeconds int) bool { + return skipDeletedTimeoutSeconds > 0 && + !pod.ObjectMeta.DeletionTimestamp.IsZero() && + int(time.Now().Sub(pod.ObjectMeta.GetDeletionTimestamp().Time).Seconds()) > skipDeletedTimeoutSeconds +} + +func (d *Helper) skipDeletedFilter(pod corev1.Pod) podDeleteStatus { + if shouldSkipPod(pod, d.SkipWaitForDeleteTimeoutSeconds) { + return makePodDeleteStatusSkip() + } + return makePodDeleteStatusOkay() +}