Cluster-autoscaler: own drain

This commit is contained in:
Marcin Wielgus 2016-10-21 19:12:59 +01:00
parent c88727e33d
commit df078c9101
5 changed files with 559 additions and 110 deletions

View File

@ -26,7 +26,6 @@ import (
kube_api "k8s.io/kubernetes/pkg/api" kube_api "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
kube_client "k8s.io/kubernetes/pkg/client/unversioned" kube_client "k8s.io/kubernetes/pkg/client/unversioned"
cmd "k8s.io/kubernetes/pkg/kubectl/cmd"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog" "github.com/golang/glog"
@ -38,6 +37,9 @@ var (
"or mirror pods)") "or mirror pods)")
skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true, skipNodesWithLocalStorage = flag.Bool("skip-nodes-with-local-storage", true,
"If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath") "If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath")
minReplicaCount = flag.Int("min-replica-count", 0,
"Minimum number or replicas that a replica set or replication controller should have to allow their pods deletion in scale down")
) )
// FindNodesToRemove finds nodes that can be removed. Returns also an information about good // FindNodesToRemove finds nodes that can be removed. Returns also an information about good
@ -68,9 +70,12 @@ candidateloop:
var podsToRemove []*kube_api.Pod var podsToRemove []*kube_api.Pod
var err error var err error
if fastCheck {
if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found { if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found {
podsToRemove, err = FastGetPodsToMove(nodeInfo, false, *skipNodesWithSystemPods, *skipNodesWithLocalStorage) if fastCheck {
podsToRemove, err = FastGetPodsToMove(nodeInfo, *skipNodesWithSystemPods, *skipNodesWithLocalStorage)
} else {
podsToRemove, err = DetailedGetPodsForMove(nodeInfo, *skipNodesWithSystemPods, *skipNodesWithLocalStorage, client, int32(*minReplicaCount))
}
if err != nil { if err != nil {
glog.V(2).Infof("%s: node %s cannot be removed: %v", evaluationType, node.Name, err) glog.V(2).Infof("%s: node %s cannot be removed: %v", evaluationType, node.Name, err)
continue candidateloop continue candidateloop
@ -79,19 +84,6 @@ candidateloop:
glog.V(2).Infof("%s: nodeInfo for %s not found", evaluationType, node.Name) glog.V(2).Infof("%s: nodeInfo for %s not found", evaluationType, node.Name)
continue candidateloop continue candidateloop
} }
} else {
drainResult, _, _, err := cmd.GetPodsForDeletionOnNodeDrain(client, node.Name,
kube_api.Codecs.UniversalDecoder(), false, true)
if err != nil {
glog.V(2).Infof("%s: node %s cannot be removed: %v", evaluationType, node.Name, err)
continue candidateloop
}
podsToRemove = make([]*kube_api.Pod, 0, len(drainResult))
for i := range drainResult {
podsToRemove = append(podsToRemove, &drainResult[i])
}
}
findProblems := findPlaceFor(node.Name, podsToRemove, allNodes, nodeNameToNodeInfo, predicateChecker, oldHints, newHints, findProblems := findPlaceFor(node.Name, podsToRemove, allNodes, nodeNameToNodeInfo, predicateChecker, oldHints, newHints,
usageTracker, timestamp) usageTracker, timestamp)

View File

@ -17,12 +17,10 @@ limitations under the License.
package simulator package simulator
import ( import (
"fmt" "k8s.io/contrib/cluster-autoscaler/utils/drain"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/controller" unversionedclient "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
) )
@ -30,87 +28,30 @@ import (
// is drained. Raises error if there is an unreplicated pod and force option was not specified. // is drained. Raises error if there is an unreplicated pod and force option was not specified.
// Based on kubectl drain code. It makes an assumption that RC, DS, Jobs and RS were deleted // Based on kubectl drain code. It makes an assumption that RC, DS, Jobs and RS were deleted
// along with their pods (no abandoned pods with dangling created-by annotation). Usefull for fast // along with their pods (no abandoned pods with dangling created-by annotation). Usefull for fast
// checks. // checks. Doesn't check i
func FastGetPodsToMove(nodeInfo *schedulercache.NodeInfo, force bool, func FastGetPodsToMove(nodeInfo *schedulercache.NodeInfo, skipNodesWithSystemPods bool, skipNodesWithLocalStorage bool) ([]*api.Pod, error) {
skipNodesWithSystemPods bool, skipNodesWithLocalStorage bool) ([]*api.Pod, error) { return drain.GetPodsForDeletionOnNodeDrain(
pods := make([]*api.Pod, 0) nodeInfo.Pods(),
unreplicatedPodNames := []string{} api.Codecs.UniversalDecoder(),
for _, pod := range nodeInfo.Pods() { skipNodesWithSystemPods,
if IsMirrorPod(pod) { skipNodesWithLocalStorage,
continue false,
} nil,
0)
replicated := false
daemonsetPod := false
creatorKind, err := CreatorRefKind(pod)
if err != nil {
return []*api.Pod{}, err
}
if creatorKind == "ReplicationController" {
replicated = true
} else if creatorKind == "DaemonSet" {
daemonsetPod = true
} else if creatorKind == "Job" {
replicated = true
} else if creatorKind == "ReplicaSet" {
replicated = true
}
if !daemonsetPod && pod.Namespace == "kube-system" && skipNodesWithSystemPods {
return []*api.Pod{}, fmt.Errorf("non-deamons set, non-mirrored, kube-system pod present: %s", pod.Name)
}
if !daemonsetPod && hasLocalStorage(pod) && skipNodesWithLocalStorage {
return []*api.Pod{}, fmt.Errorf("pod with local storage present: %s", pod.Name)
}
switch {
case daemonsetPod:
break
case !replicated:
unreplicatedPodNames = append(unreplicatedPodNames, pod.Name)
if force {
pods = append(pods, pod)
}
default:
pods = append(pods, pod)
}
}
if !force && len(unreplicatedPodNames) > 0 {
return []*api.Pod{}, fmt.Errorf("unreplicated pods present")
}
return pods, nil
} }
// CreatorRefKind returns the kind of the creator of the pod. // DetailedGetPodsForMove returns a list of pods that should be moved elsewhere if the node
func CreatorRefKind(pod *api.Pod) (string, error) { // is drained. Raises error if there is an unreplicated pod and force option was not specified.
creatorRef, found := pod.ObjectMeta.Annotations[controller.CreatedByAnnotation] // Based on kubectl drain code. It checks whether RC, DS, Jobs and RS that created these pods
if !found { // still exist.
return "", nil func DetailedGetPodsForMove(nodeInfo *schedulercache.NodeInfo, skipNodesWithSystemPods bool,
} skipNodesWithLocalStorage bool, client *unversionedclient.Client, minReplicaCount int32) ([]*api.Pod, error) {
var sr api.SerializedReference return drain.GetPodsForDeletionOnNodeDrain(
if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(creatorRef), &sr); err != nil { nodeInfo.Pods(),
return "", err api.Codecs.UniversalDecoder(),
} skipNodesWithSystemPods,
return sr.Reference.Kind, nil skipNodesWithLocalStorage,
} true,
client,
// IsMirrorPod checks whether the pod is a mirror pod. minReplicaCount)
func IsMirrorPod(pod *api.Pod) bool {
_, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey]
return found
}
func hasLocalStorage(pod *api.Pod) bool {
for _, volume := range pod.Spec.Volumes {
if isLocalVolume(&volume) {
return true
}
}
return false
}
func isLocalVolume(volume *api.Volume) bool {
return volume.HostPath != nil || volume.EmptyDir != nil
} }

View File

@ -35,7 +35,7 @@ func TestFastGetPodsToMove(t *testing.T) {
Namespace: "ns", Namespace: "ns",
}, },
} }
_, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod1), false, true, true) _, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod1), true, true)
assert.Error(t, err) assert.Error(t, err)
// Replicated pod // Replicated pod
@ -48,7 +48,7 @@ func TestFastGetPodsToMove(t *testing.T) {
}, },
}, },
} }
r2, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod2), false, true, true) r2, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod2), true, true)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, len(r2)) assert.Equal(t, 1, len(r2))
assert.Equal(t, pod2, r2[0]) assert.Equal(t, pod2, r2[0])
@ -63,7 +63,7 @@ func TestFastGetPodsToMove(t *testing.T) {
}, },
}, },
} }
r3, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod3), false, true, true) r3, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod3), true, true)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 0, len(r3)) assert.Equal(t, 0, len(r3))
@ -77,7 +77,7 @@ func TestFastGetPodsToMove(t *testing.T) {
}, },
}, },
} }
r4, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod2, pod3, pod4), false, true, true) r4, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod2, pod3, pod4), true, true)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, len(r4)) assert.Equal(t, 1, len(r4))
assert.Equal(t, pod2, r4[0]) assert.Equal(t, pod2, r4[0])
@ -92,7 +92,7 @@ func TestFastGetPodsToMove(t *testing.T) {
}, },
}, },
} }
_, err = FastGetPodsToMove(schedulercache.NewNodeInfo(pod5), false, true, true) _, err = FastGetPodsToMove(schedulercache.NewNodeInfo(pod5), true, true)
assert.Error(t, err) assert.Error(t, err)
// Local storage // Local storage
@ -114,7 +114,7 @@ func TestFastGetPodsToMove(t *testing.T) {
}, },
}, },
} }
_, err = FastGetPodsToMove(schedulercache.NewNodeInfo(pod6), false, true, true) _, err = FastGetPodsToMove(schedulercache.NewNodeInfo(pod6), true, true)
assert.Error(t, err) assert.Error(t, err)
// Non-local storage // Non-local storage
@ -138,7 +138,7 @@ func TestFastGetPodsToMove(t *testing.T) {
}, },
}, },
} }
r7, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod7), false, true, true) r7, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod7), true, true)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, len(r7)) assert.Equal(t, 1, len(r7))
} }

View File

@ -0,0 +1,192 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package drain
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime"
)
// GetPodsForDeletionOnNodeDrain returns pods that should be deleted on node drain as well as some extra information
// about possibly problematic pods (unreplicated and deamon sets).
func GetPodsForDeletionOnNodeDrain(
podList []*api.Pod,
decoder runtime.Decoder,
skipNodesWithSystemPods bool,
skipNodesWithLocalStorage bool,
checkReferences bool, // Setting this to true requires client to be not-null.
client *client.Client,
minReplica int32) ([]*api.Pod, error) {
pods := []*api.Pod{}
for _, pod := range podList {
if IsMirrorPod(pod) {
continue
}
daemonsetPod := false
replicated := false
sr, err := CreatorRef(pod)
if err != nil {
return []*api.Pod{}, fmt.Errorf("failed to obtain refkind: %v", err)
}
refKind := ""
if sr != nil {
refKind = sr.Reference.Kind
}
if refKind == "ReplicationController" {
if checkReferences {
rc, err := client.ReplicationControllers(sr.Reference.Namespace).Get(sr.Reference.Name)
// Assume a reason for an error is because the RC is either
// gone/missing or that the rc has too few replicas configured.
// TODO: replace the minReplica check with pod disruption budget.
if err == nil && rc != nil {
if rc.Spec.Replicas < minReplica {
return []*api.Pod{}, fmt.Errorf("replication controller for %s/%s has too few replicas spec: %d min: %d",
pod.Namespace, pod.Name, rc.Spec.Replicas, minReplica)
}
replicated = true
} else {
return []*api.Pod{}, fmt.Errorf("replication controller for %s/%s is not available, err: %v", pod.Namespace, pod.Name, err)
}
} else {
replicated = true
}
} else if refKind == "DaemonSet" {
if checkReferences {
ds, err := client.DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name)
// Assume the only reason for an error is because the DaemonSet is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
if err == nil && ds != nil {
// Otherwise, treat daemonset-managed pods as unmanaged since
// DaemonSet Controller currently ignores the unschedulable bit.
// FIXME(mml): Add link to the issue concerning a proper way to drain
// daemonset pods, probably using taints.
daemonsetPod = true
} else {
return []*api.Pod{}, fmt.Errorf("deamonset for %s/%s is not present, err: %v", pod.Namespace, pod.Name, err)
}
} else {
daemonsetPod = true
}
} else if refKind == "Job" {
if checkReferences {
job, err := client.ExtensionsClient.Jobs(sr.Reference.Namespace).Get(sr.Reference.Name)
// Assume the only reason for an error is because the Job is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
if err == nil && job != nil {
replicated = true
} else {
return []*api.Pod{}, fmt.Errorf("job for %s/%s is not available: err: %v", pod.Namespace, pod.Name, err)
}
} else {
replicated = true
}
} else if refKind == "ReplicaSet" {
if checkReferences {
rs, err := client.ExtensionsClient.ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name)
// Assume the only reason for an error is because the RS is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
if err == nil && rs != nil {
if rs.Spec.Replicas < minReplica {
return []*api.Pod{}, fmt.Errorf("replication controller for %s/%s has too few replicas spec: %d min: %d",
pod.Namespace, pod.Name, rs.Spec.Replicas, minReplica)
}
replicated = true
} else {
return []*api.Pod{}, fmt.Errorf("replication controller for %s/%s is not available, err: %v", pod.Namespace, pod.Name, err)
}
} else {
replicated = true
}
}
if daemonsetPod {
continue
}
if !replicated {
return []*api.Pod{}, fmt.Errorf("%s/%s is not replicated", pod.Namespace, pod.Name)
}
if pod.Namespace == "kube-system" && skipNodesWithSystemPods {
return []*api.Pod{}, fmt.Errorf("non-deamons set, non-mirrored, kube-system pod present: %s", pod.Name)
}
if HasLocalStorage(pod) && skipNodesWithLocalStorage {
return []*api.Pod{}, fmt.Errorf("pod with local storage present: %s", pod.Name)
}
pods = append(pods, pod)
}
return pods, nil
}
// CreatorRefKind returns the kind of the creator of the pod.
func CreatorRefKind(pod *api.Pod) (string, error) {
sr, err := CreatorRef(pod)
if err != nil {
return "", err
}
if sr == nil {
return "", nil
}
return sr.Reference.Kind, nil
}
// CreatorRef returns the kind of the creator reference of the pod.
func CreatorRef(pod *api.Pod) (*api.SerializedReference, error) {
creatorRef, found := pod.ObjectMeta.Annotations[controller.CreatedByAnnotation]
if !found {
return nil, nil
}
var sr api.SerializedReference
if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(creatorRef), &sr); err != nil {
return nil, err
}
return &sr, nil
}
// IsMirrorPod checks whether the pod is a mirror pod.
func IsMirrorPod(pod *api.Pod) bool {
_, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey]
return found
}
// HasLocalStorage returns true if pod has any local storage.
func HasLocalStorage(pod *api.Pod) bool {
for _, volume := range pod.Spec.Volumes {
if isLocalVolume(&volume) {
return true
}
}
return false
}
func isLocalVolume(volume *api.Volume) bool {
return volume.HostPath != nil || volume.EmptyDir != nil
}

View File

@ -0,0 +1,324 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package drain
import (
"bytes"
"io"
"io/ioutil"
"net/http"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
)
func TestDrain(t *testing.T) {
labels := make(map[string]string)
labels["my_key"] = "my_value"
rc := api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "rc",
Namespace: "default",
CreationTimestamp: unversioned.Time{Time: time.Now()},
Labels: labels,
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
},
Spec: api.ReplicationControllerSpec{
Selector: labels,
},
}
rcAnno := make(map[string]string)
rcAnno[controller.CreatedByAnnotation] = refJSON(t, &rc)
rcPod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: unversioned.Time{Time: time.Now()},
Labels: labels,
Annotations: rcAnno,
},
Spec: api.PodSpec{
NodeName: "node",
},
}
ds := extensions.DaemonSet{
ObjectMeta: api.ObjectMeta{
Name: "ds",
Namespace: "default",
CreationTimestamp: unversioned.Time{Time: time.Now()},
SelfLink: "/apis/extensions/v1beta1/namespaces/default/daemonsets/ds",
},
Spec: extensions.DaemonSetSpec{
Selector: &unversioned.LabelSelector{MatchLabels: labels},
},
}
dsAnno := make(map[string]string)
dsAnno[controller.CreatedByAnnotation] = refJSON(t, &ds)
dsPod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: unversioned.Time{Time: time.Now()},
Labels: labels,
Annotations: dsAnno,
},
Spec: api.PodSpec{
NodeName: "node",
},
}
job := batch.Job{
ObjectMeta: api.ObjectMeta{
Name: "job",
Namespace: "default",
CreationTimestamp: unversioned.Time{Time: time.Now()},
SelfLink: "/apis/extensions/v1beta1/namespaces/default/jobs/job",
},
Spec: batch.JobSpec{
Selector: &unversioned.LabelSelector{MatchLabels: labels},
},
}
jobPod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: unversioned.Time{Time: time.Now()},
Labels: labels,
Annotations: map[string]string{controller.CreatedByAnnotation: refJSON(t, &job)},
},
}
rs := extensions.ReplicaSet{
ObjectMeta: api.ObjectMeta{
Name: "rs",
Namespace: "default",
CreationTimestamp: unversioned.Time{Time: time.Now()},
Labels: labels,
SelfLink: testapi.Default.SelfLink("replicasets", "rs"),
},
Spec: extensions.ReplicaSetSpec{
Selector: &unversioned.LabelSelector{MatchLabels: labels},
},
}
rsAnno := make(map[string]string)
rsAnno[controller.CreatedByAnnotation] = refJSON(t, &rs)
rsPod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: unversioned.Time{Time: time.Now()},
Labels: labels,
Annotations: rsAnno,
},
Spec: api.PodSpec{
NodeName: "node",
},
}
nakedPod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: unversioned.Time{Time: time.Now()},
Labels: labels,
},
Spec: api.PodSpec{
NodeName: "node",
},
}
emptydirPod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: unversioned.Time{Time: time.Now()},
Labels: labels,
},
Spec: api.PodSpec{
NodeName: "node",
Volumes: []api.Volume{
{
Name: "scratch",
VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{Medium: ""}},
},
},
},
}
tests := []struct {
description string
pods []*api.Pod
rcs []api.ReplicationController
replicaSets []extensions.ReplicaSet
expectFatal bool
expectPods []*api.Pod
}{
{
description: "RC-managed pod",
pods: []*api.Pod{rcPod},
rcs: []api.ReplicationController{rc},
expectFatal: false,
expectPods: []*api.Pod{rcPod},
},
{
description: "DS-managed pod",
pods: []*api.Pod{dsPod},
expectFatal: false,
expectPods: []*api.Pod{},
},
{
description: "Job-managed pod",
pods: []*api.Pod{jobPod},
rcs: []api.ReplicationController{rc},
expectFatal: false,
expectPods: []*api.Pod{jobPod},
},
{
description: "RS-managed pod",
pods: []*api.Pod{rsPod},
replicaSets: []extensions.ReplicaSet{rs},
expectFatal: false,
expectPods: []*api.Pod{rsPod},
},
{
description: "naked pod",
pods: []*api.Pod{nakedPod},
expectFatal: true,
expectPods: []*api.Pod{},
},
{
description: "pod with EmptyDir",
pods: []*api.Pod{emptydirPod},
expectFatal: true,
expectPods: []*api.Pod{},
},
}
for _, test := range tests {
codec := testapi.Default.Codec()
extcodec := testapi.Extensions.Codec()
fakeClient := &fake.RESTClient{
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
m := &MyReq{req}
switch {
case m.isFor("GET", "/namespaces/default/replicationcontrollers/rc"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &test.rcs[0])}, nil
case m.isFor("GET", "/namespaces/default/daemonsets/ds"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(extcodec, &ds)}, nil
case m.isFor("GET", "/namespaces/default/jobs/job"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(extcodec, &job)}, nil
case m.isFor("GET", "/namespaces/default/replicasets/rs"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(extcodec, &test.replicaSets[0])}, nil
case m.isFor("GET", "/namespaces/default/pods/bar"):
return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: objBody(codec, nil)}, nil
case m.isFor("GET", "/replicationcontrollers"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.ReplicationControllerList{Items: test.rcs})}, nil
default:
t.Fatalf("%s: unexpected request: %v %#v\n%#v", test.description, req.Method, req.URL, req)
return nil, nil
}
}),
}
clientconfig := &restclient.Config{
ContentConfig: restclient.ContentConfig{
ContentType: runtime.ContentTypeJSON,
GroupVersion: testapi.Default.GroupVersion(),
},
}
client := client.NewOrDie(clientconfig)
client.Client = fakeClient.Client
client.ExtensionsClient.Client = fakeClient.Client
pods, err := GetPodsForDeletionOnNodeDrain(test.pods, api.Codecs.UniversalDecoder(),
true, true, true, client, 0)
if test.expectFatal {
if err == nil {
t.Fatalf("%s: unexpected non-error", test.description)
}
}
if !test.expectFatal {
if err != nil {
t.Fatalf("%s: error occurred: %v", test.description, err)
}
}
if len(pods) != len(test.expectPods) {
t.Fatalf("Wrong pod list content: %v", test.description)
}
}
}
func defaultHeader() http.Header {
header := http.Header{}
header.Set("Content-Type", runtime.ContentTypeJSON)
return header
}
type MyReq struct {
Request *http.Request
}
func (m *MyReq) isFor(method string, path string) bool {
req := m.Request
return method == req.Method && (req.URL.Path == path ||
req.URL.Path == strings.Join([]string{"/api/v1", path}, "") ||
req.URL.Path == strings.Join([]string{"/apis/extensions/v1beta1", path}, "") ||
req.URL.Path == strings.Join([]string{"/apis/batch/v1", path}, ""))
}
func refJSON(t *testing.T, o runtime.Object) string {
ref, err := api.GetReference(o)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
codec := testapi.Default.Codec()
json := runtime.EncodeOrDie(codec, &api.SerializedReference{Reference: *ref})
return string(json)
}
func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj))))
}