mirror of https://github.com/kubernetes/kops.git
Validate and drain with rolling update set via new feature flag DrainAndValidateRollingUpdate
This commit is contained in:
parent
32081f3ca6
commit
286455a608
|
@ -95,7 +95,8 @@ This command updates a kubernetes cluseter to match the cloud, and kops specific
|
||||||
To perform rolling update, you need to update the cloud resources first with "kops update cluster"
|
To perform rolling update, you need to update the cloud resources first with "kops update cluster"
|
||||||
|
|
||||||
Use KOPS_FEATURE_FLAGS="+DrainAndValidateRollingUpdate" to use beta code that drains the nodes
|
Use KOPS_FEATURE_FLAGS="+DrainAndValidateRollingUpdate" to use beta code that drains the nodes
|
||||||
and validates the cluser.`,
|
and validates the cluser. New flags for Drain and Validation operations will be shown when
|
||||||
|
the environment variable is set.`,
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd.Flags().BoolVar(&options.Yes, "yes", options.Yes, "perform rolling update without confirmation")
|
cmd.Flags().BoolVar(&options.Yes, "yes", options.Yes, "perform rolling update without confirmation")
|
||||||
|
@ -107,9 +108,11 @@ and validates the cluser.`,
|
||||||
cmd.Flags().DurationVar(&options.BastionInterval, "bastion-interval", options.BastionInterval, "Time to wait between restarting bastions")
|
cmd.Flags().DurationVar(&options.BastionInterval, "bastion-interval", options.BastionInterval, "Time to wait between restarting bastions")
|
||||||
cmd.Flags().StringSliceVar(&options.InstanceGroups, "instance-group", options.InstanceGroups, "List of instance groups to update (defaults to all if not specified)")
|
cmd.Flags().StringSliceVar(&options.InstanceGroups, "instance-group", options.InstanceGroups, "List of instance groups to update (defaults to all if not specified)")
|
||||||
|
|
||||||
cmd.Flags().BoolVar(&options.FailOnDrainError, "fail-on-drain-error", true, "The rolling-update will fail if draining a node fails. Enable with KOPS_FEATURE_FLAGS='+DrainAndValidateRollingUpdate'")
|
if featureflag.DrainAndValidateRollingUpdate.Enabled() {
|
||||||
cmd.Flags().BoolVar(&options.FailOnValidate, "fail-on-validate-error", true, "The rolling-update will fail if the cluster fails to validate. Enable with KOPS_FEATURE_FLAGS='+DrainAndValidateRollingUpdate'")
|
cmd.Flags().BoolVar(&options.FailOnDrainError, "fail-on-drain-error", true, "The rolling-update will fail if draining a node fails.")
|
||||||
cmd.Flags().IntVar(&options.ValidateRetries, "validate-retries", 8, "The number of times that a node will be validated. Between validation kops sleeps the master-interval/2 or node-interval/2 duration. Enable with KOPS_FEATURE_FLAGS='+DrainAndValidateRollingUpdate'")
|
cmd.Flags().BoolVar(&options.FailOnValidate, "fail-on-validate-error", true, "The rolling-update will fail if the cluster fails to validate.")
|
||||||
|
cmd.Flags().IntVar(&options.ValidateRetries, "validate-retries", 8, "The number of times that a node will be validated. Between validation kops sleeps the master-interval/2 or node-interval/2 duration.")
|
||||||
|
}
|
||||||
|
|
||||||
cmd.Run = func(cmd *cobra.Command, args []string) {
|
cmd.Run = func(cmd *cobra.Command, args []string) {
|
||||||
err := rootCommand.ProcessArgs(args)
|
err := rootCommand.ProcessArgs(args)
|
||||||
|
@ -298,7 +301,7 @@ func RunRollingUpdateCluster(f *util.Factory, out io.Writer, options *RollingUpd
|
||||||
Force: options.Force,
|
Force: options.Force,
|
||||||
Cloud: cloud,
|
Cloud: cloud,
|
||||||
K8sClient: k8sClient,
|
K8sClient: k8sClient,
|
||||||
ClientConfig: kutil.NewClientConfig(config, "kube-system"),
|
ClientConfig: kutil.NewClientConfig(config, "kube-system"),
|
||||||
FailOnDrainError: options.FailOnDrainError,
|
FailOnDrainError: options.FailOnDrainError,
|
||||||
FailOnValidate: options.FailOnValidate,
|
FailOnValidate: options.FailOnValidate,
|
||||||
CloudOnly: options.CloudOnly,
|
CloudOnly: options.CloudOnly,
|
||||||
|
|
|
@ -0,0 +1,674 @@
|
||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package kutil
|
||||||
|
|
||||||
|
////
|
||||||
|
// Based off of drain in kubectl
|
||||||
|
// https://github.com/kubernetes/kubernetes/blob/master/pkg/kubectl/cmd/drain.go
|
||||||
|
////
|
||||||
|
|
||||||
|
// TODO: remove kubectl dependencies
|
||||||
|
// TODO: can we use our own client instead of building it again
|
||||||
|
// TODO: refactor our client to be like this client
|
||||||
|
|
||||||
|
// FIXME: look at 1.5 refactor
|
||||||
|
// FIXME: we are deleting local storage for daemon sets, and why even delete local storage??
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jonboulle/clockwork"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
apierrors "k8s.io/kubernetes/pkg/api/errors"
|
||||||
|
"k8s.io/kubernetes/pkg/api/meta"
|
||||||
|
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/kubernetes/pkg/apis/policy"
|
||||||
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
|
"k8s.io/kubernetes/pkg/client/restclient"
|
||||||
|
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||||
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
|
"k8s.io/kubernetes/pkg/kubectl"
|
||||||
|
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||||
|
"k8s.io/kubernetes/pkg/kubectl/resource"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DrainOptions For Draining Node
|
||||||
|
type DrainOptions struct {
|
||||||
|
client *internalclientset.Clientset
|
||||||
|
restClient *restclient.RESTClient
|
||||||
|
factory cmdutil.Factory
|
||||||
|
Force bool
|
||||||
|
GracePeriodSeconds int
|
||||||
|
IgnoreDaemonsets bool
|
||||||
|
Timeout time.Duration
|
||||||
|
backOff clockwork.Clock
|
||||||
|
DeleteLocalData bool
|
||||||
|
mapper meta.RESTMapper
|
||||||
|
nodeInfo *resource.Info
|
||||||
|
typer runtime.ObjectTyper
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow tweaking default options for draining nodes
|
||||||
|
type DrainCommand struct {
|
||||||
|
Force bool
|
||||||
|
IgnoreDaemonsets bool
|
||||||
|
DeleteLocalData bool
|
||||||
|
GracePeriodSeconds int
|
||||||
|
Timeout int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Takes a pod and returns a bool indicating whether or not to operate on the
|
||||||
|
// pod, an optional warning message, and an optional fatal error.
|
||||||
|
type podFilter func(api.Pod) (include bool, w *warning, f *fatal)
|
||||||
|
type warning struct {
|
||||||
|
string
|
||||||
|
}
|
||||||
|
type fatal struct {
|
||||||
|
string
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
EvictionKind = "Eviction"
|
||||||
|
EvictionSubresource = "pods/eviction"
|
||||||
|
|
||||||
|
kDaemonsetFatal = "DaemonSet-managed pods (use --ignore-daemonsets to ignore)"
|
||||||
|
kDaemonsetWarning = "Ignoring DaemonSet-managed pods"
|
||||||
|
kLocalStorageFatal = "pods with local storage (use --delete-local-data to override)"
|
||||||
|
kLocalStorageWarning = "Deleting pods with local storage"
|
||||||
|
kUnmanagedFatal = "pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet (use --force to override)"
|
||||||
|
kUnmanagedWarning = "Deleting pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet"
|
||||||
|
kMaxNodeUpdateRetry = 10
|
||||||
|
)
|
||||||
|
|
||||||
|
// Create a NewDrainOptions
|
||||||
|
func NewDrainOptions(command *DrainCommand, clusterName string) (*DrainOptions, error) {
|
||||||
|
|
||||||
|
config := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
|
||||||
|
clientcmd.NewDefaultClientConfigLoadingRules(),
|
||||||
|
&clientcmd.ConfigOverrides{CurrentContext: clusterName})
|
||||||
|
f := cmdutil.NewFactory(config)
|
||||||
|
|
||||||
|
if command != nil {
|
||||||
|
duration, err := time.ParseDuration(fmt.Sprintf("%ds", command.GracePeriodSeconds))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &DrainOptions{
|
||||||
|
factory: f,
|
||||||
|
backOff: clockwork.NewRealClock(),
|
||||||
|
Force: command.Force,
|
||||||
|
IgnoreDaemonsets: command.IgnoreDaemonsets,
|
||||||
|
DeleteLocalData: command.DeleteLocalData,
|
||||||
|
GracePeriodSeconds: command.GracePeriodSeconds,
|
||||||
|
Timeout: duration,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// return will defaults
|
||||||
|
duration, err := time.ParseDuration("0s")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &DrainOptions{
|
||||||
|
factory: f,
|
||||||
|
backOff: clockwork.NewRealClock(),
|
||||||
|
Force: true,
|
||||||
|
IgnoreDaemonsets: true,
|
||||||
|
DeleteLocalData: false, // TODO: should this be true?
|
||||||
|
GracePeriodSeconds: -1,
|
||||||
|
Timeout: duration,
|
||||||
|
}, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) DrainTheNode(nodeName string) (err error) {
|
||||||
|
|
||||||
|
err = o.SetupDrain(nodeName)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Error setting up the drain: %v, node: %s", err, nodeName)
|
||||||
|
}
|
||||||
|
err = o.RunDrain()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Drain failed %v, %s", err, nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetupDrain populates some fields from the factory, grabs command line
|
||||||
|
// arguments and looks up the node using Builder
|
||||||
|
func (o *DrainOptions) SetupDrain(nodeName string) error {
|
||||||
|
|
||||||
|
if nodeName == "" {
|
||||||
|
return fmt.Errorf("nodeName cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if o.client, err = o.factory.ClientSet(); err != nil {
|
||||||
|
return fmt.Errorf("client or clientset nil %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
o.restClient, err = o.factory.RESTClient()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("rest client problem %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
o.mapper, o.typer = o.factory.Object()
|
||||||
|
|
||||||
|
cmdNamespace, _, err := o.factory.DefaultNamespace()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("DefaultNamespace problem %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r := o.factory.NewBuilder().
|
||||||
|
NamespaceParam(cmdNamespace).DefaultNamespace().
|
||||||
|
ResourceNames("node", nodeName).
|
||||||
|
Do()
|
||||||
|
|
||||||
|
if err = r.Err(); err != nil {
|
||||||
|
return fmt.Errorf("NewBuilder problem %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = r.Visit(func(info *resource.Info, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("internal vistor problem %v", err)
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("info %v", info)
|
||||||
|
o.nodeInfo = info
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatalf("Error getting nodeInfo %v", err)
|
||||||
|
return fmt.Errorf("vistor problem %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = r.Err(); err != nil {
|
||||||
|
return fmt.Errorf("vistor problem %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunDrain runs the 'drain' command
|
||||||
|
func (o *DrainOptions) RunDrain() error {
|
||||||
|
if o.nodeInfo == nil {
|
||||||
|
return fmt.Errorf("nodeInfo is not setup")
|
||||||
|
}
|
||||||
|
if err := o.RunCordonOrUncordon(true); err != nil {
|
||||||
|
glog.V(2).Infof("Error cordon node %v - %v", o.nodeInfo.Name, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err := o.deleteOrEvictPodsSimple()
|
||||||
|
if err == nil {
|
||||||
|
glog.V(2).Infof("Drained node %s", o.nodeInfo.Name)
|
||||||
|
} else {
|
||||||
|
glog.V(2).Infof("Error draining node %s - %v", o.nodeInfo.Name, err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) deleteOrEvictPodsSimple() error {
|
||||||
|
pods, err := o.getPodsForDeletion()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = o.deleteOrEvictPods(pods)
|
||||||
|
if err != nil {
|
||||||
|
pendingPods, newErr := o.getPodsForDeletion()
|
||||||
|
if newErr != nil {
|
||||||
|
return newErr
|
||||||
|
}
|
||||||
|
glog.Fatalf("There are pending pods when an error occurred: %v\n", err)
|
||||||
|
for _, pendingPod := range pendingPods {
|
||||||
|
glog.Fatalf("%s/%s\n", "pod", pendingPod.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, error) {
|
||||||
|
switch sr.Reference.Kind {
|
||||||
|
case "ReplicationController":
|
||||||
|
return o.client.Core().ReplicationControllers(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
|
||||||
|
case "DaemonSet":
|
||||||
|
return o.client.Extensions().DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
|
||||||
|
case "Job":
|
||||||
|
return o.client.Batch().Jobs(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
|
||||||
|
case "ReplicaSet":
|
||||||
|
return o.client.Extensions().ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
|
||||||
|
case "PetSet":
|
||||||
|
// FIXME: how the heck do you write this
|
||||||
|
// FIXME: Can we use the go client to make 1.4 and 1.5 calls :)
|
||||||
|
return "PetSet", nil
|
||||||
|
case "StatefulSet":
|
||||||
|
return o.client.Apps().StatefulSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("unknown controller kind %q", sr.Reference.Kind)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) getPodCreator(pod api.Pod) (*api.SerializedReference, error) {
|
||||||
|
creatorRef, found := pod.ObjectMeta.Annotations[api.CreatedByAnnotation]
|
||||||
|
if !found {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
// Now verify that the specified creator actually exists.
|
||||||
|
sr := &api.SerializedReference{}
|
||||||
|
if err := runtime.DecodeInto(o.factory.Decoder(true), []byte(creatorRef), sr); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// We assume the only reason for an error is because the controller is
|
||||||
|
// gone/missing, not for any other cause. TODO(mml): something more
|
||||||
|
// sophisticated than this
|
||||||
|
_, err := o.getController(sr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return sr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) unreplicatedFilter(pod api.Pod) (bool, *warning, *fatal) {
|
||||||
|
// any finished pod can be removed
|
||||||
|
if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed {
|
||||||
|
return true, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sr, err := o.getPodCreator(pod)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, &fatal{err.Error()}
|
||||||
|
}
|
||||||
|
if sr != nil {
|
||||||
|
return true, nil, nil
|
||||||
|
}
|
||||||
|
if !o.Force {
|
||||||
|
return false, nil, &fatal{kUnmanagedFatal}
|
||||||
|
}
|
||||||
|
return true, &warning{kUnmanagedWarning}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) daemonsetFilter(pod api.Pod) (bool, *warning, *fatal) {
|
||||||
|
// Note that we return false in all cases where the pod is DaemonSet managed,
|
||||||
|
// regardless of flags. We never delete them, the only question is whether
|
||||||
|
// their presence constitutes an error.
|
||||||
|
sr, err := o.getPodCreator(pod)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, &fatal{err.Error()}
|
||||||
|
}
|
||||||
|
if sr == nil || sr.Reference.Kind != "DaemonSet" {
|
||||||
|
return true, nil, nil
|
||||||
|
}
|
||||||
|
if _, err := o.client.Extensions().DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{}); err != nil {
|
||||||
|
return false, nil, &fatal{err.Error()}
|
||||||
|
}
|
||||||
|
if !o.IgnoreDaemonsets {
|
||||||
|
return false, nil, &fatal{kDaemonsetFatal}
|
||||||
|
}
|
||||||
|
return false, &warning{kDaemonsetWarning}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func mirrorPodFilter(pod api.Pod) (bool, *warning, *fatal) {
|
||||||
|
if _, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey]; found {
|
||||||
|
return false, nil, nil
|
||||||
|
}
|
||||||
|
return true, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func hasLocalStorage(pod api.Pod) bool {
|
||||||
|
for _, volume := range pod.Spec.Volumes {
|
||||||
|
if volume.EmptyDir != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) localStorageFilter(pod api.Pod) (bool, *warning, *fatal) {
|
||||||
|
if !hasLocalStorage(pod) {
|
||||||
|
return true, nil, nil
|
||||||
|
}
|
||||||
|
if !o.DeleteLocalData {
|
||||||
|
return false, nil, &fatal{kLocalStorageFatal}
|
||||||
|
}
|
||||||
|
return true, &warning{kLocalStorageWarning}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Map of status message to a list of pod names having that status.
|
||||||
|
type podStatuses map[string][]string
|
||||||
|
|
||||||
|
func (ps podStatuses) Message() string {
|
||||||
|
msgs := []string{}
|
||||||
|
|
||||||
|
for key, pods := range ps {
|
||||||
|
msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", ")))
|
||||||
|
}
|
||||||
|
return strings.Join(msgs, "; ")
|
||||||
|
}
|
||||||
|
|
||||||
|
// getPodsForDeletion returns all the pods we're going to delete. If there are
|
||||||
|
// any pods preventing us from deleting, we return that list in an error.
|
||||||
|
func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) {
|
||||||
|
podList, err := o.client.Core().Pods(api.NamespaceAll).List(api.ListOptions{
|
||||||
|
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeInfo.Name})})
|
||||||
|
if err != nil {
|
||||||
|
return pods, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ws := podStatuses{}
|
||||||
|
fs := podStatuses{}
|
||||||
|
|
||||||
|
for _, pod := range podList.Items {
|
||||||
|
podOk := true
|
||||||
|
// FIXME: The localStorageFilter is coming back with daemonsets
|
||||||
|
// FIXME: The filters are not excluding each other
|
||||||
|
for _, filt := range []podFilter{mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter, o.daemonsetFilter} {
|
||||||
|
filterOk, w, f := filt(pod)
|
||||||
|
|
||||||
|
podOk = podOk && filterOk
|
||||||
|
if w != nil {
|
||||||
|
ws[w.string] = append(ws[w.string], pod.Name)
|
||||||
|
}
|
||||||
|
if f != nil {
|
||||||
|
fs[f.string] = append(fs[f.string], pod.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if podOk {
|
||||||
|
pods = append(pods, pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(fs) > 0 {
|
||||||
|
return []api.Pod{}, errors.New(fs.Message())
|
||||||
|
}
|
||||||
|
if len(ws) > 0 {
|
||||||
|
glog.Warningf("WARNING: %s\n", ws.Message())
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("Pods to delete: %v", pods)
|
||||||
|
return pods, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) deletePod(pod api.Pod) error {
|
||||||
|
deleteOptions := &api.DeleteOptions{}
|
||||||
|
if o.GracePeriodSeconds >= 0 {
|
||||||
|
gracePeriodSeconds := int64(o.GracePeriodSeconds)
|
||||||
|
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
|
||||||
|
}
|
||||||
|
return o.client.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) evictPod(pod api.Pod, policyGroupVersion string) error {
|
||||||
|
deleteOptions := &api.DeleteOptions{}
|
||||||
|
if o.GracePeriodSeconds >= 0 {
|
||||||
|
gracePeriodSeconds := int64(o.GracePeriodSeconds)
|
||||||
|
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
|
||||||
|
}
|
||||||
|
|
||||||
|
eviction := &policy.Eviction{
|
||||||
|
TypeMeta: metav1.TypeMeta{
|
||||||
|
APIVersion: policyGroupVersion,
|
||||||
|
Kind: EvictionKind,
|
||||||
|
},
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: pod.Name,
|
||||||
|
Namespace: pod.Namespace,
|
||||||
|
},
|
||||||
|
DeleteOptions: deleteOptions,
|
||||||
|
}
|
||||||
|
// Remember to change change the URL manipulation func when Evction's version change
|
||||||
|
return o.client.Policy().Evictions(eviction.Namespace).Evict(eviction)
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteOrEvictPods deletes or evicts the pods on the api server
|
||||||
|
func (o *DrainOptions) deleteOrEvictPods(pods []api.Pod) error {
|
||||||
|
if len(pods) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
policyGroupVersion, err := SupportEviction(o.client)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error deleteOrEvictPods ~ SupportEviction: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
getPodFn := func(namespace, name string) (*api.Pod, error) {
|
||||||
|
return o.client.Core().Pods(namespace).Get(name, metav1.GetOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(policyGroupVersion) > 0 {
|
||||||
|
err = o.evictPods(pods, policyGroupVersion, getPodFn)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Error attempting to evict pod, will delete pod - err: %v", err)
|
||||||
|
return o.deletePods(pods, getPodFn)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
return o.deletePods(pods, getPodFn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) evictPods(pods []api.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*api.Pod, error)) error {
|
||||||
|
doneCh := make(chan bool, len(pods))
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
|
||||||
|
for _, pod := range pods {
|
||||||
|
go func(pod api.Pod, doneCh chan bool, errCh chan error) {
|
||||||
|
var err error
|
||||||
|
for {
|
||||||
|
err = o.evictPod(pod, policyGroupVersion)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
} else if apierrors.IsTooManyRequests(err) {
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
} else {
|
||||||
|
errCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
podArray := []api.Pod{pod}
|
||||||
|
_, err = o.waitForDelete(podArray, kubectl.Interval, time.Duration(math.MaxInt64), true, getPodFn)
|
||||||
|
if err == nil {
|
||||||
|
doneCh <- true
|
||||||
|
} else {
|
||||||
|
errCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
|
||||||
|
}
|
||||||
|
}(pod, doneCh, errCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
doneCount := 0
|
||||||
|
// 0 timeout means infinite, we use MaxInt64 to represent it.
|
||||||
|
var globalTimeout time.Duration
|
||||||
|
if o.Timeout == 0 {
|
||||||
|
globalTimeout = time.Duration(math.MaxInt64)
|
||||||
|
} else {
|
||||||
|
globalTimeout = o.Timeout
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
return err
|
||||||
|
case <-doneCh:
|
||||||
|
doneCount++
|
||||||
|
if doneCount == len(pods) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
case <-time.After(globalTimeout):
|
||||||
|
return fmt.Errorf("drain did not complete within %v", globalTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) deletePods(pods []api.Pod, getPodFn func(namespace, name string) (*api.Pod, error)) error {
|
||||||
|
// 0 timeout means infinite, we use MaxInt64 to represent it.
|
||||||
|
var globalTimeout time.Duration
|
||||||
|
if o.Timeout == 0 {
|
||||||
|
globalTimeout = time.Duration(math.MaxInt64)
|
||||||
|
} else {
|
||||||
|
globalTimeout = o.Timeout
|
||||||
|
}
|
||||||
|
for _, pod := range pods {
|
||||||
|
err := o.deletePod(pod)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, err := o.waitForDelete(pods, kubectl.Interval, globalTimeout, false, getPodFn)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*api.Pod, error)) ([]api.Pod, error) {
|
||||||
|
var verbStr string
|
||||||
|
if usingEviction {
|
||||||
|
verbStr = "evicted"
|
||||||
|
} else {
|
||||||
|
verbStr = "deleted"
|
||||||
|
}
|
||||||
|
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
|
||||||
|
pendingPods := []api.Pod{}
|
||||||
|
for i, pod := range pods {
|
||||||
|
p, err := getPodFn(pod.Namespace, pod.Name)
|
||||||
|
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
|
||||||
|
glog.V(2).Infof("Deleted pod %s, %s", pod.Name, verbStr)
|
||||||
|
continue
|
||||||
|
} else if err != nil {
|
||||||
|
return false, err
|
||||||
|
} else {
|
||||||
|
pendingPods = append(pendingPods, pods[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pods = pendingPods
|
||||||
|
if len(pendingPods) > 0 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
return pods, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// SupportEviction uses Discovery API to find out if the server support eviction subresource
|
||||||
|
// If support, it will return its groupVersion; Otherwise, it will return ""
|
||||||
|
func SupportEviction(clientset *internalclientset.Clientset) (string, error) {
|
||||||
|
discoveryClient := clientset.Discovery()
|
||||||
|
groupList, err := discoveryClient.ServerGroups()
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
foundPolicyGroup := false
|
||||||
|
var policyGroupVersion string
|
||||||
|
for _, group := range groupList.Groups {
|
||||||
|
if group.Name == "policy" {
|
||||||
|
foundPolicyGroup = true
|
||||||
|
policyGroupVersion = group.PreferredVersion.GroupVersion
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !foundPolicyGroup {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1")
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
for _, resource := range resourceList.APIResources {
|
||||||
|
if resource.Name == EvictionSubresource && resource.Kind == EvictionKind {
|
||||||
|
return policyGroupVersion, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
|
||||||
|
// "Unschedulable" is passed as the first arg.
|
||||||
|
func (o *DrainOptions) RunCordonOrUncordon(desired bool) error {
|
||||||
|
cmdNamespace, _, err := o.factory.DefaultNamespace()
|
||||||
|
if err != nil {
|
||||||
|
glog.V(2).Infof("Error node %s - %v", o.nodeInfo.Name, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if o.nodeInfo == nil {
|
||||||
|
return fmt.Errorf("nodeInfo nil")
|
||||||
|
}
|
||||||
|
if o.nodeInfo.Mapping == nil {
|
||||||
|
return fmt.Errorf("Mapping nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if o.nodeInfo.Mapping.GroupVersionKind.Kind == "Node" {
|
||||||
|
unsched := reflect.ValueOf(o.nodeInfo.Object).Elem().FieldByName("Spec").FieldByName("Unschedulable")
|
||||||
|
if unsched.Bool() == desired {
|
||||||
|
glog.V(2).Infof("Node is already: %s", already(desired))
|
||||||
|
} else {
|
||||||
|
helper := resource.NewHelper(o.restClient, o.nodeInfo.Mapping)
|
||||||
|
unsched.SetBool(desired)
|
||||||
|
var err error
|
||||||
|
for i := 0; i < kMaxNodeUpdateRetry; i++ {
|
||||||
|
// We don't care about what previous versions may exist, we always want
|
||||||
|
// to overwrite, and Replace always sets current ResourceVersion if version is "".
|
||||||
|
helper.Versioner.SetResourceVersion(o.nodeInfo.Object, "")
|
||||||
|
_, err = helper.Replace(cmdNamespace, o.nodeInfo.Name, true, o.nodeInfo.Object)
|
||||||
|
if err != nil {
|
||||||
|
if !apierrors.IsConflict(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// It's a race, no need to sleep
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("Node %s is : %s", o.nodeInfo.Name, changed(desired))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.V(2).Infof("Node %s is : skipped", o.nodeInfo.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// already() and changed() return suitable strings for {un,}cordoning
|
||||||
|
|
||||||
|
func already(desired bool) string {
|
||||||
|
if desired {
|
||||||
|
return "already cordoned"
|
||||||
|
}
|
||||||
|
return "already uncordoned"
|
||||||
|
}
|
||||||
|
|
||||||
|
func changed(desired bool) string {
|
||||||
|
if desired {
|
||||||
|
return "cordoned"
|
||||||
|
}
|
||||||
|
return "uncordoned"
|
||||||
|
}
|
|
@ -0,0 +1,563 @@
|
||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
////
|
||||||
|
// Based off of drain in kubectl
|
||||||
|
// https://github.com/kubernetes/kubernetes/blob/master/pkg/kubectl/cmd/drain_test.go
|
||||||
|
///
|
||||||
|
|
||||||
|
////
|
||||||
|
// TODO: implement negative test cases that are commented out
|
||||||
|
////
|
||||||
|
|
||||||
|
package kutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jonboulle/clockwork"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
|
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||||
|
"k8s.io/kubernetes/pkg/apis/batch"
|
||||||
|
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||||
|
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/kubernetes/pkg/apis/policy"
|
||||||
|
"k8s.io/kubernetes/pkg/client/restclient"
|
||||||
|
"k8s.io/kubernetes/pkg/client/restclient/fake"
|
||||||
|
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
|
||||||
|
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||||
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
EvictionMethod = "Eviction"
|
||||||
|
DeleteMethod = "Delete"
|
||||||
|
)
|
||||||
|
|
||||||
|
var node *api.Node
|
||||||
|
var cordoned_node *api.Node
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
// Create a node.
|
||||||
|
node = &api.Node{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "node",
|
||||||
|
CreationTimestamp: metav1.Time{Time: time.Now()},
|
||||||
|
},
|
||||||
|
Spec: api.NodeSpec{
|
||||||
|
ExternalID: "node",
|
||||||
|
},
|
||||||
|
Status: api.NodeStatus{},
|
||||||
|
}
|
||||||
|
clone, _ := api.Scheme.DeepCopy(node)
|
||||||
|
|
||||||
|
// A copy of the same node, but cordoned.
|
||||||
|
cordoned_node = clone.(*api.Node)
|
||||||
|
cordoned_node.Spec.Unschedulable = true
|
||||||
|
os.Exit(m.Run())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDrain(t *testing.T) {
|
||||||
|
|
||||||
|
labels := make(map[string]string)
|
||||||
|
labels["my_key"] = "my_value"
|
||||||
|
|
||||||
|
rc := api.ReplicationController{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "rc",
|
||||||
|
Namespace: "default",
|
||||||
|
CreationTimestamp: metav1.Time{Time: time.Now()},
|
||||||
|
Labels: labels,
|
||||||
|
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
|
||||||
|
},
|
||||||
|
Spec: api.ReplicationControllerSpec{
|
||||||
|
Selector: labels,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
rc_anno := make(map[string]string)
|
||||||
|
rc_anno[api.CreatedByAnnotation] = refJson(t, &rc)
|
||||||
|
|
||||||
|
rc_pod := api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "bar",
|
||||||
|
Namespace: "default",
|
||||||
|
CreationTimestamp: metav1.Time{Time: time.Now()},
|
||||||
|
Labels: labels,
|
||||||
|
Annotations: rc_anno,
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
NodeName: "node",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ds := extensions.DaemonSet{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "ds",
|
||||||
|
Namespace: "default",
|
||||||
|
CreationTimestamp: metav1.Time{Time: time.Now()},
|
||||||
|
SelfLink: "/apis/extensions/v1beta1/namespaces/default/daemonsets/ds",
|
||||||
|
},
|
||||||
|
Spec: extensions.DaemonSetSpec{
|
||||||
|
Selector: &metav1.LabelSelector{MatchLabels: labels},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ds_anno := make(map[string]string)
|
||||||
|
ds_anno[api.CreatedByAnnotation] = refJson(t, &ds)
|
||||||
|
|
||||||
|
ds_pod := api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "bar",
|
||||||
|
Namespace: "default",
|
||||||
|
CreationTimestamp: metav1.Time{Time: time.Now()},
|
||||||
|
Labels: labels,
|
||||||
|
Annotations: ds_anno,
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
NodeName: "node",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
job := batch.Job{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "job",
|
||||||
|
Namespace: "default",
|
||||||
|
CreationTimestamp: metav1.Time{Time: time.Now()},
|
||||||
|
SelfLink: "/apis/extensions/v1beta1/namespaces/default/jobs/job",
|
||||||
|
},
|
||||||
|
Spec: batch.JobSpec{
|
||||||
|
Selector: &metav1.LabelSelector{MatchLabels: labels},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
// keeping dead code, because I need to fix this for 1.5 & 1.4
|
||||||
|
job_pod := api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "bar",
|
||||||
|
Namespace: "default",
|
||||||
|
CreationTimestamp: metav1.Time{Time: time.Now()},
|
||||||
|
Labels: labels,
|
||||||
|
Annotations: map[string]string{api.CreatedByAnnotation: refJson(t, &job)},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
rs := extensions.ReplicaSet{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "rs",
|
||||||
|
Namespace: "default",
|
||||||
|
CreationTimestamp: metav1.Time{Time: time.Now()},
|
||||||
|
Labels: labels,
|
||||||
|
SelfLink: testapi.Default.SelfLink("replicasets", "rs"),
|
||||||
|
},
|
||||||
|
Spec: extensions.ReplicaSetSpec{
|
||||||
|
Selector: &metav1.LabelSelector{MatchLabels: labels},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
rs_anno := make(map[string]string)
|
||||||
|
rs_anno[api.CreatedByAnnotation] = refJson(t, &rs)
|
||||||
|
|
||||||
|
rs_pod := api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "bar",
|
||||||
|
Namespace: "default",
|
||||||
|
CreationTimestamp: metav1.Time{Time: time.Now()},
|
||||||
|
Labels: labels,
|
||||||
|
Annotations: rs_anno,
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
NodeName: "node",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
naked_pod := api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "bar",
|
||||||
|
Namespace: "default",
|
||||||
|
CreationTimestamp: metav1.Time{Time: time.Now()},
|
||||||
|
Labels: labels,
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
NodeName: "node",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
emptydir_pod := api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "bar",
|
||||||
|
Namespace: "default",
|
||||||
|
CreationTimestamp: metav1.Time{Time: time.Now()},
|
||||||
|
Labels: labels,
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
NodeName: "node",
|
||||||
|
Volumes: []api.Volume{
|
||||||
|
{
|
||||||
|
Name: "scratch",
|
||||||
|
VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{Medium: ""}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
node *api.Node
|
||||||
|
expected *api.Node
|
||||||
|
pods []api.Pod
|
||||||
|
rcs []api.ReplicationController
|
||||||
|
replicaSets []extensions.ReplicaSet
|
||||||
|
args []string
|
||||||
|
expectFatal bool
|
||||||
|
expectDelete bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
description: "RC-managed pod",
|
||||||
|
node: node,
|
||||||
|
expected: cordoned_node,
|
||||||
|
pods: []api.Pod{rc_pod},
|
||||||
|
rcs: []api.ReplicationController{rc},
|
||||||
|
args: []string{"node"},
|
||||||
|
expectFatal: false,
|
||||||
|
expectDelete: true,
|
||||||
|
},
|
||||||
|
// TODO implement a way to init with correct params
|
||||||
|
/*
|
||||||
|
{
|
||||||
|
description: "DS-managed pod",
|
||||||
|
node: node,
|
||||||
|
expected: cordoned_node,
|
||||||
|
pods: []api.Pod{ds_pod},
|
||||||
|
rcs: []api.ReplicationController{rc},
|
||||||
|
args: []string{"node"},
|
||||||
|
expectFatal: true,
|
||||||
|
expectDelete: false,
|
||||||
|
},
|
||||||
|
*/
|
||||||
|
{
|
||||||
|
description: "DS-managed pod with --ignore-daemonsets",
|
||||||
|
node: node,
|
||||||
|
expected: cordoned_node,
|
||||||
|
pods: []api.Pod{ds_pod},
|
||||||
|
rcs: []api.ReplicationController{rc},
|
||||||
|
args: []string{"node", "--ignore-daemonsets"},
|
||||||
|
expectFatal: false,
|
||||||
|
expectDelete: false,
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
// FIXME I am getting -test.v -test.run ^TestDrain$ drain_test.go:483: Job-managed pod: pod never evicted
|
||||||
|
{
|
||||||
|
description: "Job-managed pod",
|
||||||
|
node: node,
|
||||||
|
expected: cordoned_node,
|
||||||
|
pods: []api.Pod{job_pod},
|
||||||
|
rcs: []api.ReplicationController{rc},
|
||||||
|
args: []string{"node"},
|
||||||
|
expectFatal: false,
|
||||||
|
expectDelete: true,
|
||||||
|
},*/
|
||||||
|
{
|
||||||
|
description: "RS-managed pod",
|
||||||
|
node: node,
|
||||||
|
expected: cordoned_node,
|
||||||
|
pods: []api.Pod{rs_pod},
|
||||||
|
replicaSets: []extensions.ReplicaSet{rs},
|
||||||
|
args: []string{"node"},
|
||||||
|
expectFatal: false,
|
||||||
|
expectDelete: true,
|
||||||
|
},
|
||||||
|
// TODO implement a way to init with correct params
|
||||||
|
/*
|
||||||
|
{
|
||||||
|
description: "naked pod",
|
||||||
|
node: node,
|
||||||
|
expected: cordoned_node,
|
||||||
|
pods: []api.Pod{naked_pod},
|
||||||
|
rcs: []api.ReplicationController{},
|
||||||
|
args: []string{"node"},
|
||||||
|
expectFatal: true,
|
||||||
|
expectDelete: false,
|
||||||
|
},*/
|
||||||
|
{
|
||||||
|
description: "naked pod with --force",
|
||||||
|
node: node,
|
||||||
|
expected: cordoned_node,
|
||||||
|
pods: []api.Pod{naked_pod},
|
||||||
|
rcs: []api.ReplicationController{},
|
||||||
|
args: []string{"node", "--force"},
|
||||||
|
expectFatal: false,
|
||||||
|
expectDelete: true,
|
||||||
|
},
|
||||||
|
// TODO implement a way to init with correct params
|
||||||
|
/*
|
||||||
|
{
|
||||||
|
description: "pod with EmptyDir",
|
||||||
|
node: node,
|
||||||
|
expected: cordoned_node,
|
||||||
|
pods: []api.Pod{emptydir_pod},
|
||||||
|
args: []string{"node", "--force"},
|
||||||
|
expectFatal: true,
|
||||||
|
expectDelete: false,
|
||||||
|
},*/
|
||||||
|
{
|
||||||
|
description: "pod with EmptyDir and --delete-local-data",
|
||||||
|
node: node,
|
||||||
|
expected: cordoned_node,
|
||||||
|
pods: []api.Pod{emptydir_pod},
|
||||||
|
args: []string{"node", "--force", "--delete-local-data=true"},
|
||||||
|
expectFatal: false,
|
||||||
|
expectDelete: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "empty node",
|
||||||
|
node: node,
|
||||||
|
expected: cordoned_node,
|
||||||
|
pods: []api.Pod{},
|
||||||
|
rcs: []api.ReplicationController{rc},
|
||||||
|
args: []string{"node"},
|
||||||
|
expectFatal: false,
|
||||||
|
expectDelete: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
testEviction := false
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
testEviction = !testEviction
|
||||||
|
var currMethod string
|
||||||
|
if testEviction {
|
||||||
|
currMethod = EvictionMethod
|
||||||
|
} else {
|
||||||
|
currMethod = DeleteMethod
|
||||||
|
}
|
||||||
|
for _, test := range tests {
|
||||||
|
new_node := &api.Node{}
|
||||||
|
deleted := false
|
||||||
|
evicted := false
|
||||||
|
f, tf, codec, ns := cmdtesting.NewAPIFactory()
|
||||||
|
tf.Client = &fake.RESTClient{
|
||||||
|
NegotiatedSerializer: ns,
|
||||||
|
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
|
||||||
|
m := &MyReq{req}
|
||||||
|
switch {
|
||||||
|
case req.Method == "GET" && req.URL.Path == "/api":
|
||||||
|
apiVersions := metav1.APIVersions{
|
||||||
|
Versions: []string{"v1"},
|
||||||
|
}
|
||||||
|
return genResponseWithJsonEncodedBody(apiVersions)
|
||||||
|
case req.Method == "GET" && req.URL.Path == "/apis":
|
||||||
|
groupList := metav1.APIGroupList{
|
||||||
|
Groups: []metav1.APIGroup{
|
||||||
|
{
|
||||||
|
Name: "policy",
|
||||||
|
PreferredVersion: metav1.GroupVersionForDiscovery{
|
||||||
|
GroupVersion: "policy/v1beta1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return genResponseWithJsonEncodedBody(groupList)
|
||||||
|
case req.Method == "GET" && req.URL.Path == "/api/v1":
|
||||||
|
resourceList := metav1.APIResourceList{
|
||||||
|
GroupVersion: "v1",
|
||||||
|
}
|
||||||
|
if testEviction {
|
||||||
|
resourceList.APIResources = []metav1.APIResource{
|
||||||
|
{
|
||||||
|
Name: EvictionSubresource,
|
||||||
|
Kind: EvictionKind,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return genResponseWithJsonEncodedBody(resourceList)
|
||||||
|
case m.isFor("GET", "/nodes/node"):
|
||||||
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, test.node)}, nil
|
||||||
|
case m.isFor("GET", "/namespaces/default/replicationcontrollers/rc"):
|
||||||
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &test.rcs[0])}, nil
|
||||||
|
case m.isFor("GET", "/namespaces/default/daemonsets/ds"):
|
||||||
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &ds)}, nil
|
||||||
|
case m.isFor("GET", "/namespaces/default/jobs/job"):
|
||||||
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &job)}, nil
|
||||||
|
case m.isFor("GET", "/namespaces/default/replicasets/rs"):
|
||||||
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &test.replicaSets[0])}, nil
|
||||||
|
case m.isFor("GET", "/namespaces/default/pods/bar"):
|
||||||
|
return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: objBody(codec, &api.Pod{})}, nil
|
||||||
|
case m.isFor("GET", "/pods"):
|
||||||
|
values, err := url.ParseQuery(req.URL.RawQuery)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%s: unexpected error: %v", test.description, err)
|
||||||
|
}
|
||||||
|
get_params := make(url.Values)
|
||||||
|
get_params["fieldSelector"] = []string{"spec.nodeName=node"}
|
||||||
|
if !reflect.DeepEqual(get_params, values) {
|
||||||
|
t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, get_params, values)
|
||||||
|
}
|
||||||
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.PodList{Items: test.pods})}, nil
|
||||||
|
case m.isFor("GET", "/replicationcontrollers"):
|
||||||
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.ReplicationControllerList{Items: test.rcs})}, nil
|
||||||
|
case m.isFor("PUT", "/nodes/node"):
|
||||||
|
data, err := ioutil.ReadAll(req.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%s: unexpected error: %v", test.description, err)
|
||||||
|
}
|
||||||
|
defer req.Body.Close()
|
||||||
|
if err := runtime.DecodeInto(codec, data, new_node); err != nil {
|
||||||
|
t.Fatalf("%s: unexpected error: %v", test.description, err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(test.expected.Spec, new_node.Spec) {
|
||||||
|
t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, test.expected.Spec, new_node.Spec)
|
||||||
|
}
|
||||||
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, new_node)}, nil
|
||||||
|
case m.isFor("DELETE", "/namespaces/default/pods/bar"):
|
||||||
|
deleted = true
|
||||||
|
return &http.Response{StatusCode: 204, Header: defaultHeader(), Body: objBody(codec, &test.pods[0])}, nil
|
||||||
|
case m.isFor("POST", "/namespaces/default/pods/bar/eviction"):
|
||||||
|
evicted = true
|
||||||
|
return &http.Response{StatusCode: 201, Header: defaultHeader(), Body: policyObjBody(&policy.Eviction{})}, nil
|
||||||
|
default:
|
||||||
|
t.Fatalf("%s: unexpected request: %v %#v\n%#v", test.description, req.Method, req.URL, req)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
tf.ClientConfig = defaultClientConfig()
|
||||||
|
duration, _ := time.ParseDuration("0s")
|
||||||
|
cmd := &DrainOptions{
|
||||||
|
factory: f,
|
||||||
|
backOff: clockwork.NewRealClock(),
|
||||||
|
Force: true,
|
||||||
|
IgnoreDaemonsets: true,
|
||||||
|
DeleteLocalData: true,
|
||||||
|
GracePeriodSeconds: -1,
|
||||||
|
Timeout: duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
saw_fatal := false
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
// Recover from the panic below.
|
||||||
|
_ = recover()
|
||||||
|
// Restore cmdutil behavior
|
||||||
|
cmdutil.DefaultBehaviorOnFatal()
|
||||||
|
}()
|
||||||
|
cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) })
|
||||||
|
cmd.SetupDrain(node.Name)
|
||||||
|
cmd.RunDrain()
|
||||||
|
}()
|
||||||
|
|
||||||
|
if test.expectFatal {
|
||||||
|
if !saw_fatal {
|
||||||
|
t.Fatalf("%s: unexpected non-error when using %s", test.description, currMethod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.expectDelete {
|
||||||
|
// Test Delete
|
||||||
|
if !testEviction && !deleted {
|
||||||
|
t.Fatalf("%s: pod never deleted", test.description)
|
||||||
|
}
|
||||||
|
// Test Eviction
|
||||||
|
if testEviction && !evicted {
|
||||||
|
t.Fatalf("%s: pod never evicted", test.description)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !test.expectDelete {
|
||||||
|
if deleted {
|
||||||
|
t.Fatalf("%s: unexpected delete when using %s", test.description, currMethod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type MyReq struct {
|
||||||
|
Request *http.Request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MyReq) isFor(method string, path string) bool {
|
||||||
|
req := m.Request
|
||||||
|
|
||||||
|
return method == req.Method && (req.URL.Path == path ||
|
||||||
|
req.URL.Path == strings.Join([]string{"/api/v1", path}, "") ||
|
||||||
|
req.URL.Path == strings.Join([]string{"/apis/extensions/v1beta1", path}, "") ||
|
||||||
|
req.URL.Path == strings.Join([]string{"/apis/batch/v1", path}, ""))
|
||||||
|
}
|
||||||
|
|
||||||
|
func refJson(t *testing.T, o runtime.Object) string {
|
||||||
|
ref, err := api.GetReference(o)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, codec, _ := cmdtesting.NewAPIFactory()
|
||||||
|
json, err := runtime.Encode(codec, &api.SerializedReference{Reference: *ref})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(json)
|
||||||
|
}
|
||||||
|
|
||||||
|
func genResponseWithJsonEncodedBody(bodyStruct interface{}) (*http.Response, error) {
|
||||||
|
jsonBytes, err := json.Marshal(bodyStruct)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bytesBody(jsonBytes)}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultHeader() http.Header {
|
||||||
|
header := http.Header{}
|
||||||
|
header.Set("Content-Type", runtime.ContentTypeJSON)
|
||||||
|
return header
|
||||||
|
}
|
||||||
|
|
||||||
|
func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser {
|
||||||
|
return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj))))
|
||||||
|
}
|
||||||
|
|
||||||
|
func policyObjBody(obj runtime.Object) io.ReadCloser {
|
||||||
|
return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(testapi.Policy.Codec(), obj))))
|
||||||
|
}
|
||||||
|
|
||||||
|
func bytesBody(bodyBytes []byte) io.ReadCloser {
|
||||||
|
return ioutil.NopCloser(bytes.NewReader(bodyBytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultClientConfig() *restclient.Config {
|
||||||
|
return &restclient.Config{
|
||||||
|
APIPath: "/api",
|
||||||
|
ContentConfig: restclient.ContentConfig{
|
||||||
|
NegotiatedSerializer: api.Codecs,
|
||||||
|
ContentType: runtime.ContentTypeJSON,
|
||||||
|
GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/service/autoscaling"
|
"github.com/aws/aws-sdk-go/service/autoscaling"
|
||||||
"github.com/aws/aws-sdk-go/service/ec2"
|
"github.com/aws/aws-sdk-go/service/ec2"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
api "k8s.io/kops/pkg/apis/kops"
|
api "k8s.io/kops/pkg/apis/kops"
|
||||||
"k8s.io/kops/pkg/featureflag"
|
"k8s.io/kops/pkg/featureflag"
|
||||||
"k8s.io/kops/pkg/validation"
|
"k8s.io/kops/pkg/validation"
|
||||||
|
@ -32,11 +33,10 @@ import (
|
||||||
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
|
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
k8s_clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
k8s_clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
|
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||||
"k8s.io/kubernetes/pkg/kubectl/cmd"
|
"k8s.io/kubernetes/pkg/kubectl/cmd"
|
||||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||||
"github.com/spf13/cobra"
|
|
||||||
"os"
|
"os"
|
||||||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// RollingUpdateCluster is a struct containing cluster information for a rolling update.
|
// RollingUpdateCluster is a struct containing cluster information for a rolling update.
|
||||||
|
@ -50,7 +50,7 @@ type RollingUpdateCluster struct {
|
||||||
Force bool
|
Force bool
|
||||||
|
|
||||||
K8sClient *k8s_clientset.Clientset
|
K8sClient *k8s_clientset.Clientset
|
||||||
ClientConfig clientcmd.ClientConfig
|
ClientConfig clientcmd.ClientConfig
|
||||||
FailOnDrainError bool
|
FailOnDrainError bool
|
||||||
FailOnValidate bool
|
FailOnValidate bool
|
||||||
CloudOnly bool
|
CloudOnly bool
|
||||||
|
@ -464,8 +464,6 @@ func (n *CloudInstanceGroup) DeleteAWSInstance(u *CloudInstanceGroupInstance, in
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// DrainNode drains a K8s node.
|
// DrainNode drains a K8s node.
|
||||||
func (n *CloudInstanceGroup) DrainNode(u *CloudInstanceGroupInstance, rollingUpdateData *RollingUpdateCluster) error {
|
func (n *CloudInstanceGroup) DrainNode(u *CloudInstanceGroupInstance, rollingUpdateData *RollingUpdateCluster) error {
|
||||||
if rollingUpdateData.ClientConfig == nil {
|
if rollingUpdateData.ClientConfig == nil {
|
||||||
|
@ -476,12 +474,12 @@ func (n *CloudInstanceGroup) DrainNode(u *CloudInstanceGroupInstance, rollingUpd
|
||||||
// TODO: Send out somewhere else, also DrainOptions has errout
|
// TODO: Send out somewhere else, also DrainOptions has errout
|
||||||
out := os.Stdout
|
out := os.Stdout
|
||||||
|
|
||||||
options := &cmd.DrainOptions{factory: f, out: out}
|
options := &cmd.DrainOptions{Factory: f, Out: out}
|
||||||
|
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
Use: "cordon NODE",
|
Use: "cordon NODE",
|
||||||
}
|
}
|
||||||
args := []string{ u.Node.Name }
|
args := []string{u.Node.Name}
|
||||||
err := options.SetupDrain(cmd, args)
|
err := options.SetupDrain(cmd, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error setting up drain: %v", err)
|
return fmt.Errorf("error setting up drain: %v", err)
|
||||||
|
|
|
@ -0,0 +1,319 @@
|
||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package kutil
|
||||||
|
|
||||||
|
// TODO move this business logic into a service than can be called via the api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/service/ec2"
|
||||||
|
"github.com/golang/glog"
|
||||||
|
api "k8s.io/kops/pkg/apis/kops"
|
||||||
|
validate "k8s.io/kops/pkg/validation"
|
||||||
|
"k8s.io/kops/upup/pkg/fi"
|
||||||
|
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
|
||||||
|
k8s_clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RollingUpdateCluster restarts cluster nodes
|
||||||
|
type RollingUpdateClusterDV struct {
|
||||||
|
Cloud fi.Cloud
|
||||||
|
|
||||||
|
MasterInterval time.Duration
|
||||||
|
NodeInterval time.Duration
|
||||||
|
BastionInterval time.Duration
|
||||||
|
K8sClient *k8s_clientset.Clientset
|
||||||
|
|
||||||
|
ForceDrain bool
|
||||||
|
FailOnValidate bool
|
||||||
|
|
||||||
|
Force bool
|
||||||
|
|
||||||
|
CloudOnly bool
|
||||||
|
ClusterName string
|
||||||
|
}
|
||||||
|
|
||||||
|
// RollingUpdateData is used to pass information to perform a rolling update
|
||||||
|
type RollingUpdateDataDV struct {
|
||||||
|
Cloud fi.Cloud
|
||||||
|
Force bool
|
||||||
|
Interval time.Duration
|
||||||
|
InstanceGroupList *api.InstanceGroupList
|
||||||
|
IsBastion bool
|
||||||
|
|
||||||
|
K8sClient *k8s_clientset.Clientset
|
||||||
|
|
||||||
|
ForceDrain bool
|
||||||
|
FailOnValidate bool
|
||||||
|
|
||||||
|
CloudOnly bool
|
||||||
|
ClusterName string
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO move retries to RollingUpdateCluster
|
||||||
|
const retries = 8
|
||||||
|
|
||||||
|
// TODO: should we check to see if api updates exist in the cluster
|
||||||
|
// TODO: for instance should we check if Petsets exist when upgrading 1.4.x -> 1.5.x
|
||||||
|
|
||||||
|
// Perform a rolling update on a K8s Cluster
|
||||||
|
func (c *RollingUpdateClusterDV) RollingUpdateDrainValidate(groups map[string]*CloudInstanceGroup, instanceGroups *api.InstanceGroupList) error {
|
||||||
|
if len(groups) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var resultsMutex sync.Mutex
|
||||||
|
results := make(map[string]error)
|
||||||
|
|
||||||
|
masterGroups := make(map[string]*CloudInstanceGroup)
|
||||||
|
nodeGroups := make(map[string]*CloudInstanceGroup)
|
||||||
|
bastionGroups := make(map[string]*CloudInstanceGroup)
|
||||||
|
for k, group := range groups {
|
||||||
|
switch group.InstanceGroup.Spec.Role {
|
||||||
|
case api.InstanceGroupRoleNode:
|
||||||
|
nodeGroups[k] = group
|
||||||
|
case api.InstanceGroupRoleMaster:
|
||||||
|
masterGroups[k] = group
|
||||||
|
case api.InstanceGroupRoleBastion:
|
||||||
|
bastionGroups[k] = group
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown group type for group %q", group.InstanceGroup.ObjectMeta.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upgrade bastions first; if these go down we can't see anything
|
||||||
|
{
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for k, bastionGroup := range bastionGroups {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(k string, group *CloudInstanceGroup) {
|
||||||
|
resultsMutex.Lock()
|
||||||
|
results[k] = fmt.Errorf("function panic")
|
||||||
|
resultsMutex.Unlock()
|
||||||
|
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
rollingUpdateData := c.CreateRollingUpdateData(instanceGroups, true)
|
||||||
|
|
||||||
|
err := group.RollingUpdateDV(rollingUpdateData)
|
||||||
|
|
||||||
|
resultsMutex.Lock()
|
||||||
|
results[k] = err
|
||||||
|
resultsMutex.Unlock()
|
||||||
|
}(k, bastionGroup)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upgrade master next
|
||||||
|
{
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// We run master nodes in series, even if they are in separate instance groups
|
||||||
|
// typically they will be in separate instance groups, so we can force the zones,
|
||||||
|
// and we don't want to roll all the masters at the same time. See issue #284
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for k := range masterGroups {
|
||||||
|
resultsMutex.Lock()
|
||||||
|
results[k] = fmt.Errorf("function panic")
|
||||||
|
resultsMutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
for k, group := range masterGroups {
|
||||||
|
rollingUpdateData := c.CreateRollingUpdateData(instanceGroups, false)
|
||||||
|
|
||||||
|
err := group.RollingUpdateDV(rollingUpdateData)
|
||||||
|
resultsMutex.Lock()
|
||||||
|
results[k] = err
|
||||||
|
resultsMutex.Unlock()
|
||||||
|
|
||||||
|
// TODO: Bail on error?
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upgrade nodes, with greater parallelism
|
||||||
|
// TODO increase each instancegroups nodes by one
|
||||||
|
{
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for k, nodeGroup := range nodeGroups {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(k string, group *CloudInstanceGroup) {
|
||||||
|
resultsMutex.Lock()
|
||||||
|
results[k] = fmt.Errorf("function panic")
|
||||||
|
resultsMutex.Unlock()
|
||||||
|
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
rollingUpdateData := c.CreateRollingUpdateData(instanceGroups, false)
|
||||||
|
|
||||||
|
err := group.RollingUpdateDV(rollingUpdateData)
|
||||||
|
|
||||||
|
resultsMutex.Lock()
|
||||||
|
results[k] = err
|
||||||
|
resultsMutex.Unlock()
|
||||||
|
}(k, nodeGroup)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, err := range results {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.Info("\nRolling update completed!\n")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *RollingUpdateClusterDV) CreateRollingUpdateData(instanceGroups *api.InstanceGroupList, isBastion bool) *RollingUpdateDataDV {
|
||||||
|
return &RollingUpdateDataDV{
|
||||||
|
Cloud: c.Cloud,
|
||||||
|
Force: c.Force,
|
||||||
|
Interval: c.NodeInterval,
|
||||||
|
InstanceGroupList: instanceGroups,
|
||||||
|
IsBastion: isBastion,
|
||||||
|
K8sClient: c.K8sClient,
|
||||||
|
FailOnValidate: c.FailOnValidate,
|
||||||
|
ForceDrain: c.ForceDrain,
|
||||||
|
CloudOnly: c.CloudOnly,
|
||||||
|
ClusterName: c.ClusterName,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RollingUpdate performs a rolling update on a list of ec2 instances.
|
||||||
|
func (n *CloudInstanceGroup) RollingUpdateDV(rollingUpdateData *RollingUpdateDataDV) error {
|
||||||
|
|
||||||
|
// we should not get here, but hey I am going to check
|
||||||
|
if rollingUpdateData == nil {
|
||||||
|
return fmt.Errorf("RollingUpdate cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do not need a k8s client if you are doing cloud only
|
||||||
|
if rollingUpdateData.K8sClient == nil && !rollingUpdateData.CloudOnly {
|
||||||
|
return fmt.Errorf("RollingUpdate is missing a k8s client")
|
||||||
|
}
|
||||||
|
|
||||||
|
if rollingUpdateData.InstanceGroupList == nil {
|
||||||
|
return fmt.Errorf("RollingUpdate is missing a the InstanceGroupList")
|
||||||
|
}
|
||||||
|
|
||||||
|
c := rollingUpdateData.Cloud.(awsup.AWSCloud)
|
||||||
|
|
||||||
|
update := n.NeedUpdate
|
||||||
|
if rollingUpdateData.Force {
|
||||||
|
update = append(update, n.Ready...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO is this logic correct
|
||||||
|
if !rollingUpdateData.IsBastion && rollingUpdateData.FailOnValidate && !rollingUpdateData.CloudOnly {
|
||||||
|
_, err := validate.ValidateCluster(rollingUpdateData.ClusterName, rollingUpdateData.InstanceGroupList, rollingUpdateData.K8sClient)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Cluster %s does not pass validation", rollingUpdateData.ClusterName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, u := range update {
|
||||||
|
|
||||||
|
if !rollingUpdateData.IsBastion {
|
||||||
|
if rollingUpdateData.CloudOnly {
|
||||||
|
glog.Warningf("not draining nodes - cloud only is set")
|
||||||
|
} else {
|
||||||
|
drain, err := NewDrainOptions(nil, u.Node.ClusterName)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Error creating drain: %v", err)
|
||||||
|
if rollingUpdateData.ForceDrain == false {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = drain.DrainTheNode(u.Node.Name)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("setupErr: %v", err)
|
||||||
|
}
|
||||||
|
if rollingUpdateData.ForceDrain == false {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Temporarily increase size of ASG?
|
||||||
|
// TODO: Remove from ASG first so status is immediately updated?
|
||||||
|
// TODO: Batch termination, like a rolling-update
|
||||||
|
// TODO: check if an asg is running the correct number of instances
|
||||||
|
|
||||||
|
instanceID := aws.StringValue(u.ASGInstance.InstanceId)
|
||||||
|
glog.Infof("Stopping instance %q in AWS ASG %q", instanceID, n.ASGName)
|
||||||
|
|
||||||
|
request := &ec2.TerminateInstancesInput{
|
||||||
|
InstanceIds: []*string{u.ASGInstance.InstanceId},
|
||||||
|
}
|
||||||
|
_, err := c.EC2().TerminateInstances(request)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error deleting instance %q: %v", instanceID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !rollingUpdateData.IsBastion {
|
||||||
|
// Wait for new EC2 instances to be created
|
||||||
|
time.Sleep(rollingUpdateData.Interval)
|
||||||
|
|
||||||
|
// Wait until the cluster is happy
|
||||||
|
// TODO: do we need to respect cloud only??
|
||||||
|
for i := 0; i <= retries; i++ {
|
||||||
|
|
||||||
|
if rollingUpdateData.CloudOnly {
|
||||||
|
glog.Warningf("sleeping only - not validating nodes as cloudonly flag is set")
|
||||||
|
time.Sleep(rollingUpdateData.Interval)
|
||||||
|
} else {
|
||||||
|
_, err = validate.ValidateCluster(rollingUpdateData.ClusterName, rollingUpdateData.InstanceGroupList, rollingUpdateData.K8sClient)
|
||||||
|
if err != nil {
|
||||||
|
glog.Infof("Unable to validate k8s cluster: %s.", err)
|
||||||
|
time.Sleep(rollingUpdateData.Interval / 2)
|
||||||
|
} else {
|
||||||
|
glog.Info("Cluster validated proceeding with next step in rolling update")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if rollingUpdateData.CloudOnly {
|
||||||
|
glog.Warningf("not validating nodes as cloudonly flag is set")
|
||||||
|
} else if err != nil && rollingUpdateData.FailOnValidate {
|
||||||
|
return fmt.Errorf("validation timed out while performing rolling update: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue