Merge pull request #80045 from justinsb/drain_more_reusable

Make drain library more reusable

Kubernetes-commit: b97d08fbe09c1194d80030d9e87473b01f26e3ab
This commit is contained in:
Kubernetes Publisher 2019-08-21 02:25:19 -07:00
commit 586c708feb
8 changed files with 535 additions and 279 deletions

2
Godeps/Godeps.json generated
View File

@ -500,7 +500,7 @@
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "ac02f8882ef6"
"Rev": "2ef880f74d2a"
},
{
"ImportPath": "k8s.io/cli-runtime",

4
go.mod
View File

@ -36,7 +36,7 @@ require (
gopkg.in/yaml.v2 v2.2.2
gotest.tools v2.2.0+incompatible // indirect
k8s.io/api v0.0.0-20190820101039-d651a1528133
k8s.io/apimachinery v0.0.0-20190820100751-ac02f8882ef6
k8s.io/apimachinery v0.0.0-20190822052848-2ef880f74d2a
k8s.io/cli-runtime v0.0.0-20190822063518-817772b35fe3
k8s.io/client-go v0.0.0-20190822053941-f4e58ce6093c
k8s.io/component-base v0.0.0-20190820102445-1391c9d535ae
@ -54,7 +54,7 @@ replace (
golang.org/x/sys => golang.org/x/sys v0.0.0-20190209173611-3b5209105503
golang.org/x/text => golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db
k8s.io/api => k8s.io/api v0.0.0-20190820101039-d651a1528133
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190820100751-ac02f8882ef6
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190822052848-2ef880f74d2a
k8s.io/cli-runtime => k8s.io/cli-runtime v0.0.0-20190822063518-817772b35fe3
k8s.io/client-go => k8s.io/client-go v0.0.0-20190822053941-f4e58ce6093c
k8s.io/code-generator => k8s.io/code-generator v0.0.0-20190820100531-4a676795660b

2
go.sum
View File

@ -243,7 +243,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
k8s.io/api v0.0.0-20190820101039-d651a1528133/go.mod h1:AlhL1I0Xqh5Tyz0HsxjEhy+iKci9l1Qy3UMDFW7iG3A=
k8s.io/apimachinery v0.0.0-20190820100751-ac02f8882ef6/go.mod h1:EZoIMuAgG/4v58YL+bz0kqnivqupk28fKYxFCa5e6X8=
k8s.io/apimachinery v0.0.0-20190822052848-2ef880f74d2a/go.mod h1:EZoIMuAgG/4v58YL+bz0kqnivqupk28fKYxFCa5e6X8=
k8s.io/cli-runtime v0.0.0-20190822063518-817772b35fe3/go.mod h1:pfe5DIHseKGe87mN06NwZGIigAzG1/+6DkNMESEowj8=
k8s.io/client-go v0.0.0-20190822053941-f4e58ce6093c/go.mod h1:JQqPsz82IY3YfNQAAJksCCAS8PL9KecFC2CQ4aB60GU=
k8s.io/code-generator v0.0.0-20190820100531-4a676795660b/go.mod h1:cGLmodO4Xf0bnfw8/mLEXWYjiT5N9RFA9NzrYzP8VhM=

View File

@ -19,19 +19,14 @@ package drain
import (
"errors"
"fmt"
"math"
"time"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/printers"
@ -146,14 +141,34 @@ var (
)
func NewDrainCmdOptions(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *DrainCmdOptions {
return &DrainCmdOptions{
o := &DrainCmdOptions{
PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme),
IOStreams: ioStreams,
drainer: &drain.Helper{
GracePeriodSeconds: -1,
Out: ioStreams.Out,
ErrOut: ioStreams.ErrOut,
},
}
o.drainer.OnPodDeletedOrEvicted = o.onPodDeletedOrEvicted
return o
}
// onPodDeletedOrEvicted is called by drain.Helper, when the pod has been deleted or evicted
func (o *DrainCmdOptions) onPodDeletedOrEvicted(pod *corev1.Pod, usingEviction bool) {
var verbStr string
if usingEviction {
verbStr = "evicted"
} else {
verbStr = "deleted"
}
printObj, err := o.ToPrinter(verbStr)
if err != nil {
fmt.Fprintf(o.ErrOut, "error building printer: %v\n", err)
fmt.Fprintf(o.Out, "pod %s/%s %s\n", pod.Namespace, pod.Name, verbStr)
} else {
printObj(pod, o.Out)
}
}
func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
@ -313,7 +328,7 @@ func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error
fmt.Fprintf(o.ErrOut, "WARNING: %s\n", warnings)
}
if err := o.deleteOrEvictPods(list.Pods()); err != nil {
if err := o.drainer.DeleteOrEvictPods(list.Pods()); err != nil {
pendingList, newErrs := o.drainer.GetPodsForDeletion(nodeInfo.Name)
fmt.Fprintf(o.ErrOut, "There are pending pods in node %q when an error occurred: %v\n", nodeInfo.Name, err)
@ -328,136 +343,6 @@ func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error
return nil
}
// deleteOrEvictPods deletes or evicts the pods on the api server
func (o *DrainCmdOptions) deleteOrEvictPods(pods []corev1.Pod) error {
if len(pods) == 0 {
return nil
}
policyGroupVersion, err := drain.CheckEvictionSupport(o.drainer.Client)
if err != nil {
return err
}
getPodFn := func(namespace, name string) (*corev1.Pod, error) {
return o.drainer.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
}
if len(policyGroupVersion) > 0 {
return o.evictPods(pods, policyGroupVersion, getPodFn)
} else {
return o.deletePods(pods, getPodFn)
}
}
func (o *DrainCmdOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
returnCh := make(chan error, 1)
for _, pod := range pods {
go func(pod corev1.Pod, returnCh chan error) {
for {
fmt.Fprintf(o.Out, "evicting pod %q\n", pod.Name)
err := o.drainer.EvictPod(pod, policyGroupVersion)
if err == nil {
break
} else if apierrors.IsNotFound(err) {
returnCh <- nil
return
} else if apierrors.IsTooManyRequests(err) {
fmt.Fprintf(o.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err)
time.Sleep(5 * time.Second)
} else {
returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
return
}
}
_, err := o.waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn)
if err == nil {
returnCh <- nil
} else {
returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
}
}(pod, returnCh)
}
doneCount := 0
var errors []error
// 0 timeout means infinite, we use MaxInt64 to represent it.
var globalTimeout time.Duration
if o.drainer.Timeout == 0 {
globalTimeout = time.Duration(math.MaxInt64)
} else {
globalTimeout = o.drainer.Timeout
}
globalTimeoutCh := time.After(globalTimeout)
numPods := len(pods)
for doneCount < numPods {
select {
case err := <-returnCh:
doneCount++
if err != nil {
errors = append(errors, err)
}
case <-globalTimeoutCh:
return fmt.Errorf("drain did not complete within %v", globalTimeout)
}
}
return utilerrors.NewAggregate(errors)
}
func (o *DrainCmdOptions) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
// 0 timeout means infinite, we use MaxInt64 to represent it.
var globalTimeout time.Duration
if o.drainer.Timeout == 0 {
globalTimeout = time.Duration(math.MaxInt64)
} else {
globalTimeout = o.drainer.Timeout
}
for _, pod := range pods {
err := o.drainer.DeletePod(pod)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
}
_, err := o.waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn)
return err
}
func (o *DrainCmdOptions) waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error)) ([]corev1.Pod, error) {
var verbStr string
if usingEviction {
verbStr = "evicted"
} else {
verbStr = "deleted"
}
printObj, err := o.ToPrinter(verbStr)
if err != nil {
return pods, err
}
err = wait.PollImmediate(interval, timeout, func() (bool, error) {
pendingPods := []corev1.Pod{}
for i, pod := range pods {
p, err := getPodFn(pod.Namespace, pod.Name)
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
printObj(&pod, o.Out)
continue
} else if err != nil {
return false, err
} else {
pendingPods = append(pendingPods, pods[i])
}
}
pods = pendingPods
if len(pendingPods) > 0 {
return false, nil
}
return true, nil
})
return pods, err
}
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
// "Unschedulable" is passed as the first arg.
func (o *DrainCmdOptions) RunCordonOrUncordon(desired bool) error {

View File

@ -17,15 +17,11 @@ limitations under the License.
package drain
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"strings"
"sync/atomic"
"testing"
@ -38,14 +34,10 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/printers"
"k8s.io/client-go/rest/fake"
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
@ -907,135 +899,6 @@ func TestDrain(t *testing.T) {
}
}
func TestDeletePods(t *testing.T) {
ifHasBeenCalled := map[string]bool{}
tests := []struct {
description string
interval time.Duration
timeout time.Duration
expectPendingPods bool
expectError bool
expectedError *error
getPodFn func(namespace, name string) (*corev1.Pod, error)
}{
{
description: "Wait for deleting to complete",
interval: 100 * time.Millisecond,
timeout: 10 * time.Second,
expectPendingPods: false,
expectError: false,
expectedError: nil,
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
oldPodMap, _ := createPods(false)
newPodMap, _ := createPods(true)
if oldPod, found := oldPodMap[name]; found {
if _, ok := ifHasBeenCalled[name]; !ok {
ifHasBeenCalled[name] = true
return &oldPod, nil
}
if oldPod.ObjectMeta.Generation < 4 {
newPod := newPodMap[name]
return &newPod, nil
}
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name)
}
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name)
},
},
{
description: "Deleting could timeout",
interval: 200 * time.Millisecond,
timeout: 3 * time.Second,
expectPendingPods: true,
expectError: true,
expectedError: &wait.ErrWaitTimeout,
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
oldPodMap, _ := createPods(false)
if oldPod, found := oldPodMap[name]; found {
return &oldPod, nil
}
return nil, fmt.Errorf("%q: not found", name)
},
},
{
description: "Client error could be passed out",
interval: 200 * time.Millisecond,
timeout: 5 * time.Second,
expectPendingPods: true,
expectError: true,
expectedError: nil,
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
return nil, errors.New("This is a random error for testing")
},
},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
tf := cmdtesting.NewTestFactory()
defer tf.Cleanup()
o := DrainCmdOptions{
PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme),
}
o.Out = os.Stdout
o.ToPrinter = func(operation string) (printers.ResourcePrinterFunc, error) {
return func(obj runtime.Object, out io.Writer) error {
return nil
}, nil
}
_, pods := createPods(false)
pendingPods, err := o.waitForDelete(pods, test.interval, test.timeout, false, test.getPodFn)
if test.expectError {
if err == nil {
t.Fatalf("%s: unexpected non-error", test.description)
} else if test.expectedError != nil {
if *test.expectedError != err {
t.Fatalf("%s: the error does not match expected error", test.description)
}
}
}
if !test.expectError && err != nil {
t.Fatalf("%s: unexpected error", test.description)
}
if test.expectPendingPods && len(pendingPods) == 0 {
t.Fatalf("%s: unexpected empty pods", test.description)
}
if !test.expectPendingPods && len(pendingPods) > 0 {
t.Fatalf("%s: unexpected pending pods", test.description)
}
})
}
}
func createPods(ifCreateNewPods bool) (map[string]corev1.Pod, []corev1.Pod) {
podMap := make(map[string]corev1.Pod)
podSlice := []corev1.Pod{}
for i := 0; i < 8; i++ {
var uid types.UID
if ifCreateNewPods {
uid = types.UID(i)
} else {
uid = types.UID(strconv.Itoa(i) + strconv.Itoa(i))
}
pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod" + strconv.Itoa(i),
Namespace: "default",
UID: uid,
Generation: int64(i),
},
}
podMap[pod.Name] = pod
podSlice = append(podSlice, pod)
}
return podMap, podSlice
}
type MyReq struct {
Request *http.Request
}

69
pkg/drain/default.go Normal file
View File

@ -0,0 +1,69 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package drain
import (
"fmt"
corev1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
)
// This file contains default implementations of how to
// drain/cordon/uncordon nodes. These functions may be called
// directly, or their functionality copied into your own code, for
// example if you want different output behaviour.
// RunNodeDrain shows the canonical way to drain a node.
// You should first cordon the node, e.g. using RunCordonOrUncordon
func RunNodeDrain(drainer *Helper, nodeName string) error {
// TODO(justinsb): Ensure we have adequate e2e coverage of this function in library consumers
list, errs := drainer.GetPodsForDeletion(nodeName)
if errs != nil {
return utilerrors.NewAggregate(errs)
}
if warnings := list.Warnings(); warnings != "" {
fmt.Fprintf(drainer.ErrOut, "WARNING: %s\n", warnings)
}
if err := drainer.DeleteOrEvictPods(list.Pods()); err != nil {
// Maybe warn about non-deleted pods here
return err
}
return nil
}
// RunCordonOrUncordon demonstrates the canonical way to cordon or uncordon a Node
func RunCordonOrUncordon(drainer *Helper, node *corev1.Node, desired bool) error {
// TODO(justinsb): Ensure we have adequate e2e coverage of this function in library consumers
c := NewCordonHelper(node)
if updateRequired := c.UpdateIfRequired(desired); !updateRequired {
// Already done
return nil
}
err, patchErr := c.PatchOrReplace(drainer.Client)
if patchErr != nil {
return patchErr
}
if err != nil {
return err
}
return nil
}

View File

@ -17,14 +17,19 @@ limitations under the License.
package drain
import (
"fmt"
"io"
"math"
"time"
corev1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)
@ -39,14 +44,20 @@ const (
type Helper struct {
Client kubernetes.Interface
Force bool
DryRun bool
GracePeriodSeconds int
IgnoreAllDaemonSets bool
Timeout time.Duration
DeleteLocalData bool
Selector string
PodSelector string
Out io.Writer
ErrOut io.Writer
// TODO(justinsb): unnecessary?
DryRun bool
// OnPodDeletedOrEvicted is called when a pod is evicted/deleted; for printing progress output
OnPodDeletedOrEvicted func(pod *corev1.Pod, usingEviction bool)
}
// CheckEvictionSupport uses Discovery API to find out if the server support
@ -157,3 +168,125 @@ func (d *Helper) GetPodsForDeletion(nodeName string) (*podDeleteList, []error) {
return list, nil
}
// DeleteOrEvictPods deletes or evicts the pods on the api server
func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error {
if len(pods) == 0 {
return nil
}
policyGroupVersion, err := CheckEvictionSupport(d.Client)
if err != nil {
return err
}
// TODO(justinsb): unnecessary?
getPodFn := func(namespace, name string) (*corev1.Pod, error) {
return d.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
}
if len(policyGroupVersion) > 0 {
return d.evictPods(pods, policyGroupVersion, getPodFn)
}
return d.deletePods(pods, getPodFn)
}
func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
returnCh := make(chan error, 1)
for _, pod := range pods {
go func(pod corev1.Pod, returnCh chan error) {
for {
fmt.Fprintf(d.Out, "evicting pod %q\n", pod.Name)
err := d.EvictPod(pod, policyGroupVersion)
if err == nil {
break
} else if apierrors.IsNotFound(err) {
returnCh <- nil
return
} else if apierrors.IsTooManyRequests(err) {
fmt.Fprintf(d.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err)
time.Sleep(5 * time.Second)
} else {
returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
return
}
}
_, err := waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted)
if err == nil {
returnCh <- nil
} else {
returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
}
}(pod, returnCh)
}
doneCount := 0
var errors []error
// 0 timeout means infinite, we use MaxInt64 to represent it.
var globalTimeout time.Duration
if d.Timeout == 0 {
globalTimeout = time.Duration(math.MaxInt64)
} else {
globalTimeout = d.Timeout
}
globalTimeoutCh := time.After(globalTimeout)
numPods := len(pods)
for doneCount < numPods {
select {
case err := <-returnCh:
doneCount++
if err != nil {
errors = append(errors, err)
}
case <-globalTimeoutCh:
return fmt.Errorf("drain did not complete within %v", globalTimeout)
}
}
return utilerrors.NewAggregate(errors)
}
func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
// 0 timeout means infinite, we use MaxInt64 to represent it.
var globalTimeout time.Duration
if d.Timeout == 0 {
globalTimeout = time.Duration(math.MaxInt64)
} else {
globalTimeout = d.Timeout
}
for _, pod := range pods {
err := d.DeletePod(pod)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
}
_, err := waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted)
return err
}
func waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error), onDoneFn func(pod *corev1.Pod, usingEviction bool)) ([]corev1.Pod, error) {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
pendingPods := []corev1.Pod{}
for i, pod := range pods {
p, err := getPodFn(pod.Namespace, pod.Name)
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
if onDoneFn != nil {
onDoneFn(&pod, usingEviction)
}
continue
} else if err != nil {
return false, err
} else {
pendingPods = append(pendingPods, pods[i])
}
}
pods = pendingPods
if len(pendingPods) > 0 {
return false, nil
}
return true, nil
})
return pods, err
}

306
pkg/drain/drain_test.go Normal file
View File

@ -0,0 +1,306 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package drain
import (
"errors"
"fmt"
"os"
"reflect"
"sort"
"strconv"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
ktest "k8s.io/client-go/testing"
)
func TestDeletePods(t *testing.T) {
ifHasBeenCalled := map[string]bool{}
tests := []struct {
description string
interval time.Duration
timeout time.Duration
expectPendingPods bool
expectError bool
expectedError *error
getPodFn func(namespace, name string) (*corev1.Pod, error)
}{
{
description: "Wait for deleting to complete",
interval: 100 * time.Millisecond,
timeout: 10 * time.Second,
expectPendingPods: false,
expectError: false,
expectedError: nil,
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
oldPodMap, _ := createPods(false)
newPodMap, _ := createPods(true)
if oldPod, found := oldPodMap[name]; found {
if _, ok := ifHasBeenCalled[name]; !ok {
ifHasBeenCalled[name] = true
return &oldPod, nil
}
if oldPod.ObjectMeta.Generation < 4 {
newPod := newPodMap[name]
return &newPod, nil
}
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name)
}
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name)
},
},
{
description: "Deleting could timeout",
interval: 200 * time.Millisecond,
timeout: 3 * time.Second,
expectPendingPods: true,
expectError: true,
expectedError: &wait.ErrWaitTimeout,
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
oldPodMap, _ := createPods(false)
if oldPod, found := oldPodMap[name]; found {
return &oldPod, nil
}
return nil, fmt.Errorf("%q: not found", name)
},
},
{
description: "Client error could be passed out",
interval: 200 * time.Millisecond,
timeout: 5 * time.Second,
expectPendingPods: true,
expectError: true,
expectedError: nil,
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
return nil, errors.New("This is a random error for testing")
},
},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
_, pods := createPods(false)
pendingPods, err := waitForDelete(pods, test.interval, test.timeout, false, test.getPodFn, nil)
if test.expectError {
if err == nil {
t.Fatalf("%s: unexpected non-error", test.description)
} else if test.expectedError != nil {
if *test.expectedError != err {
t.Fatalf("%s: the error does not match expected error", test.description)
}
}
}
if !test.expectError && err != nil {
t.Fatalf("%s: unexpected error", test.description)
}
if test.expectPendingPods && len(pendingPods) == 0 {
t.Fatalf("%s: unexpected empty pods", test.description)
}
if !test.expectPendingPods && len(pendingPods) > 0 {
t.Fatalf("%s: unexpected pending pods", test.description)
}
})
}
}
func createPods(ifCreateNewPods bool) (map[string]corev1.Pod, []corev1.Pod) {
podMap := make(map[string]corev1.Pod)
podSlice := []corev1.Pod{}
for i := 0; i < 8; i++ {
var uid types.UID
if ifCreateNewPods {
uid = types.UID(i)
} else {
uid = types.UID(strconv.Itoa(i) + strconv.Itoa(i))
}
pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod" + strconv.Itoa(i),
Namespace: "default",
UID: uid,
Generation: int64(i),
},
}
podMap[pod.Name] = pod
podSlice = append(podSlice, pod)
}
return podMap, podSlice
}
// addEvictionSupport implements simple fake eviction support on the fake.Clientset
func addEvictionSupport(t *testing.T, k *fake.Clientset) {
podsEviction := metav1.APIResource{
Name: "pods/eviction",
Kind: "Eviction",
Group: "",
Version: "v1",
}
coreResources := &metav1.APIResourceList{
GroupVersion: "v1",
APIResources: []metav1.APIResource{podsEviction},
}
policyResources := &metav1.APIResourceList{
GroupVersion: "policy/v1",
}
k.Resources = append(k.Resources, coreResources, policyResources)
// Delete pods when evict is called
k.PrependReactor("create", "pods", func(action ktest.Action) (bool, runtime.Object, error) {
if action.GetSubresource() != "eviction" {
return false, nil, nil
}
eviction := *action.(ktest.CreateAction).GetObject().(*policyv1beta1.Eviction)
// Avoid the lock
go func() {
err := k.CoreV1().Pods(eviction.Namespace).Delete(eviction.Name, &metav1.DeleteOptions{})
if err != nil {
// Errorf because we can't call Fatalf from another goroutine
t.Errorf("failed to delete pod: %s/%s", eviction.Namespace, eviction.Name)
}
}()
return true, nil, nil
})
}
func TestCheckEvictionSupport(t *testing.T) {
for _, evictionSupported := range []bool{true, false} {
evictionSupported := evictionSupported
t.Run(fmt.Sprintf("evictionSupported=%v", evictionSupported),
func(t *testing.T) {
k := fake.NewSimpleClientset()
if evictionSupported {
addEvictionSupport(t, k)
}
apiGroup, err := CheckEvictionSupport(k)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expectedAPIGroup := ""
if evictionSupported {
expectedAPIGroup = "policy/v1"
}
if apiGroup != expectedAPIGroup {
t.Fatalf("expected apigroup %q, actual=%q", expectedAPIGroup, apiGroup)
}
})
}
}
func TestDeleteOrEvict(t *testing.T) {
for _, evictionSupported := range []bool{true, false} {
evictionSupported := evictionSupported
t.Run(fmt.Sprintf("evictionSupported=%v", evictionSupported),
func(t *testing.T) {
h := &Helper{
Out: os.Stdout,
GracePeriodSeconds: 10,
}
// Create 4 pods, and try to remove the first 2
var expectedEvictions []policyv1beta1.Eviction
var create []runtime.Object
deletePods := []corev1.Pod{}
for i := 1; i <= 4; i++ {
pod := &corev1.Pod{}
pod.Name = fmt.Sprintf("mypod-%d", i)
pod.Namespace = "default"
create = append(create, pod)
if i <= 2 {
deletePods = append(deletePods, *pod)
if evictionSupported {
eviction := policyv1beta1.Eviction{}
eviction.Kind = "Eviction"
eviction.APIVersion = "policy/v1"
eviction.Namespace = pod.Namespace
eviction.Name = pod.Name
gracePeriodSeconds := int64(h.GracePeriodSeconds)
eviction.DeleteOptions = &metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
}
expectedEvictions = append(expectedEvictions, eviction)
}
}
}
// Build the fake client
k := fake.NewSimpleClientset(create...)
if evictionSupported {
addEvictionSupport(t, k)
}
h.Client = k
// Do the eviction
if err := h.DeleteOrEvictPods(deletePods); err != nil {
t.Fatalf("error from DeleteOrEvictPods: %v", err)
}
// Test that other pods are still there
var remainingPods []string
{
podList, err := k.CoreV1().Pods("").List(metav1.ListOptions{})
if err != nil {
t.Fatalf("error listing pods: %v", err)
}
for _, pod := range podList.Items {
remainingPods = append(remainingPods, pod.Namespace+"/"+pod.Name)
}
sort.Strings(remainingPods)
}
expected := []string{"default/mypod-3", "default/mypod-4"}
if !reflect.DeepEqual(remainingPods, expected) {
t.Errorf("unexpected remaining pods after DeleteOrEvictPods; actual %v; expected %v", remainingPods, expected)
}
// Test that pods were evicted as expected
var actualEvictions []policyv1beta1.Eviction
for _, action := range k.Actions() {
if action.GetVerb() != "create" || action.GetResource().Resource != "pods" || action.GetSubresource() != "eviction" {
continue
}
eviction := *action.(ktest.CreateAction).GetObject().(*policyv1beta1.Eviction)
actualEvictions = append(actualEvictions, eviction)
}
sort.Slice(actualEvictions, func(i, j int) bool {
return actualEvictions[i].Name < actualEvictions[j].Name
})
if !reflect.DeepEqual(actualEvictions, expectedEvictions) {
t.Errorf("unexpected evictions; actual %v; expected %v", actualEvictions, expectedEvictions)
}
})
}
}