From 286455a60800d508c14e4b9c8273277362b32f78 Mon Sep 17 00:00:00 2001 From: chrislovecnm Date: Sun, 11 Dec 2016 19:35:53 -0700 Subject: [PATCH] Validate and drain with rolling update set via new feature flag DrainAndValidateRollingUpdate --- cmd/kops/rollingupdatecluster.go | 13 +- upup/pkg/kutil/drain.go | 674 ++++++++++++++++++ upup/pkg/kutil/drain_test.go | 563 +++++++++++++++ upup/pkg/kutil/rollingupdate_cluster.go | 14 +- .../rollingupdate_cluster_validate_drain.go | 319 +++++++++ 5 files changed, 1570 insertions(+), 13 deletions(-) create mode 100644 upup/pkg/kutil/drain.go create mode 100644 upup/pkg/kutil/drain_test.go create mode 100644 upup/pkg/kutil/rollingupdate_cluster_validate_drain.go diff --git a/cmd/kops/rollingupdatecluster.go b/cmd/kops/rollingupdatecluster.go index 6956c379c0..1568516704 100644 --- a/cmd/kops/rollingupdatecluster.go +++ b/cmd/kops/rollingupdatecluster.go @@ -95,7 +95,8 @@ This command updates a kubernetes cluseter to match the cloud, and kops specific To perform rolling update, you need to update the cloud resources first with "kops update cluster" Use KOPS_FEATURE_FLAGS="+DrainAndValidateRollingUpdate" to use beta code that drains the nodes -and validates the cluser.`, +and validates the cluser. New flags for Drain and Validation operations will be shown when +the environment variable is set.`, } cmd.Flags().BoolVar(&options.Yes, "yes", options.Yes, "perform rolling update without confirmation") @@ -107,9 +108,11 @@ and validates the cluser.`, cmd.Flags().DurationVar(&options.BastionInterval, "bastion-interval", options.BastionInterval, "Time to wait between restarting bastions") cmd.Flags().StringSliceVar(&options.InstanceGroups, "instance-group", options.InstanceGroups, "List of instance groups to update (defaults to all if not specified)") - cmd.Flags().BoolVar(&options.FailOnDrainError, "fail-on-drain-error", true, "The rolling-update will fail if draining a node fails. Enable with KOPS_FEATURE_FLAGS='+DrainAndValidateRollingUpdate'") - cmd.Flags().BoolVar(&options.FailOnValidate, "fail-on-validate-error", true, "The rolling-update will fail if the cluster fails to validate. Enable with KOPS_FEATURE_FLAGS='+DrainAndValidateRollingUpdate'") - cmd.Flags().IntVar(&options.ValidateRetries, "validate-retries", 8, "The number of times that a node will be validated. Between validation kops sleeps the master-interval/2 or node-interval/2 duration. Enable with KOPS_FEATURE_FLAGS='+DrainAndValidateRollingUpdate'") + if featureflag.DrainAndValidateRollingUpdate.Enabled() { + cmd.Flags().BoolVar(&options.FailOnDrainError, "fail-on-drain-error", true, "The rolling-update will fail if draining a node fails.") + cmd.Flags().BoolVar(&options.FailOnValidate, "fail-on-validate-error", true, "The rolling-update will fail if the cluster fails to validate.") + cmd.Flags().IntVar(&options.ValidateRetries, "validate-retries", 8, "The number of times that a node will be validated. Between validation kops sleeps the master-interval/2 or node-interval/2 duration.") + } cmd.Run = func(cmd *cobra.Command, args []string) { err := rootCommand.ProcessArgs(args) @@ -298,7 +301,7 @@ func RunRollingUpdateCluster(f *util.Factory, out io.Writer, options *RollingUpd Force: options.Force, Cloud: cloud, K8sClient: k8sClient, - ClientConfig: kutil.NewClientConfig(config, "kube-system"), + ClientConfig: kutil.NewClientConfig(config, "kube-system"), FailOnDrainError: options.FailOnDrainError, FailOnValidate: options.FailOnValidate, CloudOnly: options.CloudOnly, diff --git a/upup/pkg/kutil/drain.go b/upup/pkg/kutil/drain.go new file mode 100644 index 0000000000..53af388670 --- /dev/null +++ b/upup/pkg/kutil/drain.go @@ -0,0 +1,674 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kutil + +//// +// Based off of drain in kubectl +// https://github.com/kubernetes/kubernetes/blob/master/pkg/kubectl/cmd/drain.go +//// + +// TODO: remove kubectl dependencies +// TODO: can we use our own client instead of building it again +// TODO: refactor our client to be like this client + +// FIXME: look at 1.5 refactor +// FIXME: we are deleting local storage for daemon sets, and why even delete local storage?? + +import ( + "errors" + "fmt" + "math" + "reflect" + "strings" + "time" + + "github.com/jonboulle/clockwork" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/meta" + metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/apis/policy" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/kubectl" + cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" + "k8s.io/kubernetes/pkg/kubectl/resource" + "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/wait" +) + +// DrainOptions For Draining Node +type DrainOptions struct { + client *internalclientset.Clientset + restClient *restclient.RESTClient + factory cmdutil.Factory + Force bool + GracePeriodSeconds int + IgnoreDaemonsets bool + Timeout time.Duration + backOff clockwork.Clock + DeleteLocalData bool + mapper meta.RESTMapper + nodeInfo *resource.Info + typer runtime.ObjectTyper +} + +// Allow tweaking default options for draining nodes +type DrainCommand struct { + Force bool + IgnoreDaemonsets bool + DeleteLocalData bool + GracePeriodSeconds int + Timeout int +} + +// Takes a pod and returns a bool indicating whether or not to operate on the +// pod, an optional warning message, and an optional fatal error. +type podFilter func(api.Pod) (include bool, w *warning, f *fatal) +type warning struct { + string +} +type fatal struct { + string +} + +const ( + EvictionKind = "Eviction" + EvictionSubresource = "pods/eviction" + + kDaemonsetFatal = "DaemonSet-managed pods (use --ignore-daemonsets to ignore)" + kDaemonsetWarning = "Ignoring DaemonSet-managed pods" + kLocalStorageFatal = "pods with local storage (use --delete-local-data to override)" + kLocalStorageWarning = "Deleting pods with local storage" + kUnmanagedFatal = "pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet (use --force to override)" + kUnmanagedWarning = "Deleting pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet" + kMaxNodeUpdateRetry = 10 +) + +// Create a NewDrainOptions +func NewDrainOptions(command *DrainCommand, clusterName string) (*DrainOptions, error) { + + config := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + clientcmd.NewDefaultClientConfigLoadingRules(), + &clientcmd.ConfigOverrides{CurrentContext: clusterName}) + f := cmdutil.NewFactory(config) + + if command != nil { + duration, err := time.ParseDuration(fmt.Sprintf("%ds", command.GracePeriodSeconds)) + if err != nil { + return nil, err + } + return &DrainOptions{ + factory: f, + backOff: clockwork.NewRealClock(), + Force: command.Force, + IgnoreDaemonsets: command.IgnoreDaemonsets, + DeleteLocalData: command.DeleteLocalData, + GracePeriodSeconds: command.GracePeriodSeconds, + Timeout: duration, + }, nil + } + + // return will defaults + duration, err := time.ParseDuration("0s") + if err != nil { + return nil, err + } + return &DrainOptions{ + factory: f, + backOff: clockwork.NewRealClock(), + Force: true, + IgnoreDaemonsets: true, + DeleteLocalData: false, // TODO: should this be true? + GracePeriodSeconds: -1, + Timeout: duration, + }, nil + +} + +func (o *DrainOptions) DrainTheNode(nodeName string) (err error) { + + err = o.SetupDrain(nodeName) + + if err != nil { + return fmt.Errorf("Error setting up the drain: %v, node: %s", err, nodeName) + } + err = o.RunDrain() + + if err != nil { + return fmt.Errorf("Drain failed %v, %s", err, nodeName) + } + + return nil +} + +// SetupDrain populates some fields from the factory, grabs command line +// arguments and looks up the node using Builder +func (o *DrainOptions) SetupDrain(nodeName string) error { + + if nodeName == "" { + return fmt.Errorf("nodeName cannot be empty") + } + + var err error + + if o.client, err = o.factory.ClientSet(); err != nil { + return fmt.Errorf("client or clientset nil %v", err) + } + + o.restClient, err = o.factory.RESTClient() + if err != nil { + return fmt.Errorf("rest client problem %v", err) + } + + o.mapper, o.typer = o.factory.Object() + + cmdNamespace, _, err := o.factory.DefaultNamespace() + if err != nil { + return fmt.Errorf("DefaultNamespace problem %v", err) + } + + r := o.factory.NewBuilder(). + NamespaceParam(cmdNamespace).DefaultNamespace(). + ResourceNames("node", nodeName). + Do() + + if err = r.Err(); err != nil { + return fmt.Errorf("NewBuilder problem %v", err) + } + + err = r.Visit(func(info *resource.Info, err error) error { + if err != nil { + return fmt.Errorf("internal vistor problem %v", err) + } + glog.V(2).Infof("info %v", info) + o.nodeInfo = info + return nil + }) + + if err != nil { + glog.Fatalf("Error getting nodeInfo %v", err) + return fmt.Errorf("vistor problem %v", err) + } + + if err = r.Err(); err != nil { + return fmt.Errorf("vistor problem %v", err) + } + + return nil +} + +// RunDrain runs the 'drain' command +func (o *DrainOptions) RunDrain() error { + if o.nodeInfo == nil { + return fmt.Errorf("nodeInfo is not setup") + } + if err := o.RunCordonOrUncordon(true); err != nil { + glog.V(2).Infof("Error cordon node %v - %v", o.nodeInfo.Name, err) + return err + } + + err := o.deleteOrEvictPodsSimple() + if err == nil { + glog.V(2).Infof("Drained node %s", o.nodeInfo.Name) + } else { + glog.V(2).Infof("Error draining node %s - %v", o.nodeInfo.Name, err) + } + return err +} + +func (o *DrainOptions) deleteOrEvictPodsSimple() error { + pods, err := o.getPodsForDeletion() + if err != nil { + return err + } + + err = o.deleteOrEvictPods(pods) + if err != nil { + pendingPods, newErr := o.getPodsForDeletion() + if newErr != nil { + return newErr + } + glog.Fatalf("There are pending pods when an error occurred: %v\n", err) + for _, pendingPod := range pendingPods { + glog.Fatalf("%s/%s\n", "pod", pendingPod.Name) + } + } + return err +} + +func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, error) { + switch sr.Reference.Kind { + case "ReplicationController": + return o.client.Core().ReplicationControllers(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{}) + case "DaemonSet": + return o.client.Extensions().DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{}) + case "Job": + return o.client.Batch().Jobs(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{}) + case "ReplicaSet": + return o.client.Extensions().ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{}) + case "PetSet": + // FIXME: how the heck do you write this + // FIXME: Can we use the go client to make 1.4 and 1.5 calls :) + return "PetSet", nil + case "StatefulSet": + return o.client.Apps().StatefulSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{}) + } + return nil, fmt.Errorf("unknown controller kind %q", sr.Reference.Kind) +} + +func (o *DrainOptions) getPodCreator(pod api.Pod) (*api.SerializedReference, error) { + creatorRef, found := pod.ObjectMeta.Annotations[api.CreatedByAnnotation] + if !found { + return nil, nil + } + // Now verify that the specified creator actually exists. + sr := &api.SerializedReference{} + if err := runtime.DecodeInto(o.factory.Decoder(true), []byte(creatorRef), sr); err != nil { + return nil, err + } + // We assume the only reason for an error is because the controller is + // gone/missing, not for any other cause. TODO(mml): something more + // sophisticated than this + _, err := o.getController(sr) + if err != nil { + return nil, err + } + return sr, nil +} + +func (o *DrainOptions) unreplicatedFilter(pod api.Pod) (bool, *warning, *fatal) { + // any finished pod can be removed + if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed { + return true, nil, nil + } + + sr, err := o.getPodCreator(pod) + if err != nil { + return false, nil, &fatal{err.Error()} + } + if sr != nil { + return true, nil, nil + } + if !o.Force { + return false, nil, &fatal{kUnmanagedFatal} + } + return true, &warning{kUnmanagedWarning}, nil +} + +func (o *DrainOptions) daemonsetFilter(pod api.Pod) (bool, *warning, *fatal) { + // Note that we return false in all cases where the pod is DaemonSet managed, + // regardless of flags. We never delete them, the only question is whether + // their presence constitutes an error. + sr, err := o.getPodCreator(pod) + if err != nil { + return false, nil, &fatal{err.Error()} + } + if sr == nil || sr.Reference.Kind != "DaemonSet" { + return true, nil, nil + } + if _, err := o.client.Extensions().DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{}); err != nil { + return false, nil, &fatal{err.Error()} + } + if !o.IgnoreDaemonsets { + return false, nil, &fatal{kDaemonsetFatal} + } + return false, &warning{kDaemonsetWarning}, nil +} + +func mirrorPodFilter(pod api.Pod) (bool, *warning, *fatal) { + if _, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey]; found { + return false, nil, nil + } + return true, nil, nil +} + +func hasLocalStorage(pod api.Pod) bool { + for _, volume := range pod.Spec.Volumes { + if volume.EmptyDir != nil { + return true + } + } + + return false +} + +func (o *DrainOptions) localStorageFilter(pod api.Pod) (bool, *warning, *fatal) { + if !hasLocalStorage(pod) { + return true, nil, nil + } + if !o.DeleteLocalData { + return false, nil, &fatal{kLocalStorageFatal} + } + return true, &warning{kLocalStorageWarning}, nil +} + +// Map of status message to a list of pod names having that status. +type podStatuses map[string][]string + +func (ps podStatuses) Message() string { + msgs := []string{} + + for key, pods := range ps { + msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", "))) + } + return strings.Join(msgs, "; ") +} + +// getPodsForDeletion returns all the pods we're going to delete. If there are +// any pods preventing us from deleting, we return that list in an error. +func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) { + podList, err := o.client.Core().Pods(api.NamespaceAll).List(api.ListOptions{ + FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeInfo.Name})}) + if err != nil { + return pods, err + } + + ws := podStatuses{} + fs := podStatuses{} + + for _, pod := range podList.Items { + podOk := true + // FIXME: The localStorageFilter is coming back with daemonsets + // FIXME: The filters are not excluding each other + for _, filt := range []podFilter{mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter, o.daemonsetFilter} { + filterOk, w, f := filt(pod) + + podOk = podOk && filterOk + if w != nil { + ws[w.string] = append(ws[w.string], pod.Name) + } + if f != nil { + fs[f.string] = append(fs[f.string], pod.Name) + } + } + if podOk { + pods = append(pods, pod) + } + } + + if len(fs) > 0 { + return []api.Pod{}, errors.New(fs.Message()) + } + if len(ws) > 0 { + glog.Warningf("WARNING: %s\n", ws.Message()) + } + glog.V(2).Infof("Pods to delete: %v", pods) + return pods, nil +} + +func (o *DrainOptions) deletePod(pod api.Pod) error { + deleteOptions := &api.DeleteOptions{} + if o.GracePeriodSeconds >= 0 { + gracePeriodSeconds := int64(o.GracePeriodSeconds) + deleteOptions.GracePeriodSeconds = &gracePeriodSeconds + } + return o.client.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions) +} + +func (o *DrainOptions) evictPod(pod api.Pod, policyGroupVersion string) error { + deleteOptions := &api.DeleteOptions{} + if o.GracePeriodSeconds >= 0 { + gracePeriodSeconds := int64(o.GracePeriodSeconds) + deleteOptions.GracePeriodSeconds = &gracePeriodSeconds + } + + eviction := &policy.Eviction{ + TypeMeta: metav1.TypeMeta{ + APIVersion: policyGroupVersion, + Kind: EvictionKind, + }, + ObjectMeta: api.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + DeleteOptions: deleteOptions, + } + // Remember to change change the URL manipulation func when Evction's version change + return o.client.Policy().Evictions(eviction.Namespace).Evict(eviction) +} + +// deleteOrEvictPods deletes or evicts the pods on the api server +func (o *DrainOptions) deleteOrEvictPods(pods []api.Pod) error { + if len(pods) == 0 { + return nil + } + + policyGroupVersion, err := SupportEviction(o.client) + if err != nil { + return fmt.Errorf("error deleteOrEvictPods ~ SupportEviction: %v", err) + } + + getPodFn := func(namespace, name string) (*api.Pod, error) { + return o.client.Core().Pods(namespace).Get(name, metav1.GetOptions{}) + } + + if len(policyGroupVersion) > 0 { + err = o.evictPods(pods, policyGroupVersion, getPodFn) + + if err != nil { + glog.Warningf("Error attempting to evict pod, will delete pod - err: %v", err) + return o.deletePods(pods, getPodFn) + } + + return nil + } else { + return o.deletePods(pods, getPodFn) + } +} + +func (o *DrainOptions) evictPods(pods []api.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*api.Pod, error)) error { + doneCh := make(chan bool, len(pods)) + errCh := make(chan error, 1) + + for _, pod := range pods { + go func(pod api.Pod, doneCh chan bool, errCh chan error) { + var err error + for { + err = o.evictPod(pod, policyGroupVersion) + if err == nil { + break + } else if apierrors.IsTooManyRequests(err) { + time.Sleep(5 * time.Second) + } else { + errCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err) + return + } + } + podArray := []api.Pod{pod} + _, err = o.waitForDelete(podArray, kubectl.Interval, time.Duration(math.MaxInt64), true, getPodFn) + if err == nil { + doneCh <- true + } else { + errCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) + } + }(pod, doneCh, errCh) + } + + doneCount := 0 + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if o.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = o.Timeout + } + for { + select { + case err := <-errCh: + return err + case <-doneCh: + doneCount++ + if doneCount == len(pods) { + return nil + } + case <-time.After(globalTimeout): + return fmt.Errorf("drain did not complete within %v", globalTimeout) + } + } +} + +func (o *DrainOptions) deletePods(pods []api.Pod, getPodFn func(namespace, name string) (*api.Pod, error)) error { + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if o.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = o.Timeout + } + for _, pod := range pods { + err := o.deletePod(pod) + if err != nil { + return err + } + } + _, err := o.waitForDelete(pods, kubectl.Interval, globalTimeout, false, getPodFn) + return err +} + +func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*api.Pod, error)) ([]api.Pod, error) { + var verbStr string + if usingEviction { + verbStr = "evicted" + } else { + verbStr = "deleted" + } + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + pendingPods := []api.Pod{} + for i, pod := range pods { + p, err := getPodFn(pod.Namespace, pod.Name) + if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { + glog.V(2).Infof("Deleted pod %s, %s", pod.Name, verbStr) + continue + } else if err != nil { + return false, err + } else { + pendingPods = append(pendingPods, pods[i]) + } + } + pods = pendingPods + if len(pendingPods) > 0 { + return false, nil + } + return true, nil + }) + return pods, err +} + +// SupportEviction uses Discovery API to find out if the server support eviction subresource +// If support, it will return its groupVersion; Otherwise, it will return "" +func SupportEviction(clientset *internalclientset.Clientset) (string, error) { + discoveryClient := clientset.Discovery() + groupList, err := discoveryClient.ServerGroups() + if err != nil { + return "", err + } + foundPolicyGroup := false + var policyGroupVersion string + for _, group := range groupList.Groups { + if group.Name == "policy" { + foundPolicyGroup = true + policyGroupVersion = group.PreferredVersion.GroupVersion + break + } + } + if !foundPolicyGroup { + return "", nil + } + resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1") + if err != nil { + return "", err + } + for _, resource := range resourceList.APIResources { + if resource.Name == EvictionSubresource && resource.Kind == EvictionKind { + return policyGroupVersion, nil + } + } + return "", nil +} + +// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for +// "Unschedulable" is passed as the first arg. +func (o *DrainOptions) RunCordonOrUncordon(desired bool) error { + cmdNamespace, _, err := o.factory.DefaultNamespace() + if err != nil { + glog.V(2).Infof("Error node %s - %v", o.nodeInfo.Name, err) + return err + } + + if o.nodeInfo == nil { + return fmt.Errorf("nodeInfo nil") + } + if o.nodeInfo.Mapping == nil { + return fmt.Errorf("Mapping nil") + } + + if o.nodeInfo.Mapping.GroupVersionKind.Kind == "Node" { + unsched := reflect.ValueOf(o.nodeInfo.Object).Elem().FieldByName("Spec").FieldByName("Unschedulable") + if unsched.Bool() == desired { + glog.V(2).Infof("Node is already: %s", already(desired)) + } else { + helper := resource.NewHelper(o.restClient, o.nodeInfo.Mapping) + unsched.SetBool(desired) + var err error + for i := 0; i < kMaxNodeUpdateRetry; i++ { + // We don't care about what previous versions may exist, we always want + // to overwrite, and Replace always sets current ResourceVersion if version is "". + helper.Versioner.SetResourceVersion(o.nodeInfo.Object, "") + _, err = helper.Replace(cmdNamespace, o.nodeInfo.Name, true, o.nodeInfo.Object) + if err != nil { + if !apierrors.IsConflict(err) { + return err + } + } else { + break + } + // It's a race, no need to sleep + } + if err != nil { + return err + } + glog.V(2).Infof("Node %s is : %s", o.nodeInfo.Name, changed(desired)) + } + } else { + glog.V(2).Infof("Node %s is : skipped", o.nodeInfo.Name) + } + + return nil +} + +// already() and changed() return suitable strings for {un,}cordoning + +func already(desired bool) string { + if desired { + return "already cordoned" + } + return "already uncordoned" +} + +func changed(desired bool) string { + if desired { + return "cordoned" + } + return "uncordoned" +} diff --git a/upup/pkg/kutil/drain_test.go b/upup/pkg/kutil/drain_test.go new file mode 100644 index 0000000000..420855d6e3 --- /dev/null +++ b/upup/pkg/kutil/drain_test.go @@ -0,0 +1,563 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +//// +// Based off of drain in kubectl +// https://github.com/kubernetes/kubernetes/blob/master/pkg/kubectl/cmd/drain_test.go +/// + +//// +// TODO: implement negative test cases that are commented out +//// + +package kutil + +import ( + "bytes" + "encoding/json" + "io" + "io/ioutil" + "net/http" + "net/url" + "os" + "reflect" + "strings" + "testing" + "time" + + "github.com/jonboulle/clockwork" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/apimachinery/registered" + "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/apis/extensions" + metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/apis/policy" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/restclient/fake" + cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing" + cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" + "k8s.io/kubernetes/pkg/runtime" +) + +const ( + EvictionMethod = "Eviction" + DeleteMethod = "Delete" +) + +var node *api.Node +var cordoned_node *api.Node + +func TestMain(m *testing.M) { + // Create a node. + node = &api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "node", + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Spec: api.NodeSpec{ + ExternalID: "node", + }, + Status: api.NodeStatus{}, + } + clone, _ := api.Scheme.DeepCopy(node) + + // A copy of the same node, but cordoned. + cordoned_node = clone.(*api.Node) + cordoned_node.Spec.Unschedulable = true + os.Exit(m.Run()) +} + +func TestDrain(t *testing.T) { + + labels := make(map[string]string) + labels["my_key"] = "my_value" + + rc := api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: "rc", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + Labels: labels, + SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"), + }, + Spec: api.ReplicationControllerSpec{ + Selector: labels, + }, + } + + rc_anno := make(map[string]string) + rc_anno[api.CreatedByAnnotation] = refJson(t, &rc) + + rc_pod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + Labels: labels, + Annotations: rc_anno, + }, + Spec: api.PodSpec{ + NodeName: "node", + }, + } + + ds := extensions.DaemonSet{ + ObjectMeta: api.ObjectMeta{ + Name: "ds", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + SelfLink: "/apis/extensions/v1beta1/namespaces/default/daemonsets/ds", + }, + Spec: extensions.DaemonSetSpec{ + Selector: &metav1.LabelSelector{MatchLabels: labels}, + }, + } + + ds_anno := make(map[string]string) + ds_anno[api.CreatedByAnnotation] = refJson(t, &ds) + + ds_pod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + Labels: labels, + Annotations: ds_anno, + }, + Spec: api.PodSpec{ + NodeName: "node", + }, + } + + job := batch.Job{ + ObjectMeta: api.ObjectMeta{ + Name: "job", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + SelfLink: "/apis/extensions/v1beta1/namespaces/default/jobs/job", + }, + Spec: batch.JobSpec{ + Selector: &metav1.LabelSelector{MatchLabels: labels}, + }, + } + + /* + // keeping dead code, because I need to fix this for 1.5 & 1.4 + job_pod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + Labels: labels, + Annotations: map[string]string{api.CreatedByAnnotation: refJson(t, &job)}, + }, + } + */ + + rs := extensions.ReplicaSet{ + ObjectMeta: api.ObjectMeta{ + Name: "rs", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + Labels: labels, + SelfLink: testapi.Default.SelfLink("replicasets", "rs"), + }, + Spec: extensions.ReplicaSetSpec{ + Selector: &metav1.LabelSelector{MatchLabels: labels}, + }, + } + + rs_anno := make(map[string]string) + rs_anno[api.CreatedByAnnotation] = refJson(t, &rs) + + rs_pod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + Labels: labels, + Annotations: rs_anno, + }, + Spec: api.PodSpec{ + NodeName: "node", + }, + } + + naked_pod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + Labels: labels, + }, + Spec: api.PodSpec{ + NodeName: "node", + }, + } + + emptydir_pod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + Labels: labels, + }, + Spec: api.PodSpec{ + NodeName: "node", + Volumes: []api.Volume{ + { + Name: "scratch", + VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{Medium: ""}}, + }, + }, + }, + } + + tests := []struct { + description string + node *api.Node + expected *api.Node + pods []api.Pod + rcs []api.ReplicationController + replicaSets []extensions.ReplicaSet + args []string + expectFatal bool + expectDelete bool + }{ + { + description: "RC-managed pod", + node: node, + expected: cordoned_node, + pods: []api.Pod{rc_pod}, + rcs: []api.ReplicationController{rc}, + args: []string{"node"}, + expectFatal: false, + expectDelete: true, + }, + // TODO implement a way to init with correct params + /* + { + description: "DS-managed pod", + node: node, + expected: cordoned_node, + pods: []api.Pod{ds_pod}, + rcs: []api.ReplicationController{rc}, + args: []string{"node"}, + expectFatal: true, + expectDelete: false, + }, + */ + { + description: "DS-managed pod with --ignore-daemonsets", + node: node, + expected: cordoned_node, + pods: []api.Pod{ds_pod}, + rcs: []api.ReplicationController{rc}, + args: []string{"node", "--ignore-daemonsets"}, + expectFatal: false, + expectDelete: false, + }, + /* + // FIXME I am getting -test.v -test.run ^TestDrain$ drain_test.go:483: Job-managed pod: pod never evicted + { + description: "Job-managed pod", + node: node, + expected: cordoned_node, + pods: []api.Pod{job_pod}, + rcs: []api.ReplicationController{rc}, + args: []string{"node"}, + expectFatal: false, + expectDelete: true, + },*/ + { + description: "RS-managed pod", + node: node, + expected: cordoned_node, + pods: []api.Pod{rs_pod}, + replicaSets: []extensions.ReplicaSet{rs}, + args: []string{"node"}, + expectFatal: false, + expectDelete: true, + }, + // TODO implement a way to init with correct params + /* + { + description: "naked pod", + node: node, + expected: cordoned_node, + pods: []api.Pod{naked_pod}, + rcs: []api.ReplicationController{}, + args: []string{"node"}, + expectFatal: true, + expectDelete: false, + },*/ + { + description: "naked pod with --force", + node: node, + expected: cordoned_node, + pods: []api.Pod{naked_pod}, + rcs: []api.ReplicationController{}, + args: []string{"node", "--force"}, + expectFatal: false, + expectDelete: true, + }, + // TODO implement a way to init with correct params + /* + { + description: "pod with EmptyDir", + node: node, + expected: cordoned_node, + pods: []api.Pod{emptydir_pod}, + args: []string{"node", "--force"}, + expectFatal: true, + expectDelete: false, + },*/ + { + description: "pod with EmptyDir and --delete-local-data", + node: node, + expected: cordoned_node, + pods: []api.Pod{emptydir_pod}, + args: []string{"node", "--force", "--delete-local-data=true"}, + expectFatal: false, + expectDelete: true, + }, + { + description: "empty node", + node: node, + expected: cordoned_node, + pods: []api.Pod{}, + rcs: []api.ReplicationController{rc}, + args: []string{"node"}, + expectFatal: false, + expectDelete: false, + }, + } + + testEviction := false + for i := 0; i < 2; i++ { + testEviction = !testEviction + var currMethod string + if testEviction { + currMethod = EvictionMethod + } else { + currMethod = DeleteMethod + } + for _, test := range tests { + new_node := &api.Node{} + deleted := false + evicted := false + f, tf, codec, ns := cmdtesting.NewAPIFactory() + tf.Client = &fake.RESTClient{ + NegotiatedSerializer: ns, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + m := &MyReq{req} + switch { + case req.Method == "GET" && req.URL.Path == "/api": + apiVersions := metav1.APIVersions{ + Versions: []string{"v1"}, + } + return genResponseWithJsonEncodedBody(apiVersions) + case req.Method == "GET" && req.URL.Path == "/apis": + groupList := metav1.APIGroupList{ + Groups: []metav1.APIGroup{ + { + Name: "policy", + PreferredVersion: metav1.GroupVersionForDiscovery{ + GroupVersion: "policy/v1beta1", + }, + }, + }, + } + return genResponseWithJsonEncodedBody(groupList) + case req.Method == "GET" && req.URL.Path == "/api/v1": + resourceList := metav1.APIResourceList{ + GroupVersion: "v1", + } + if testEviction { + resourceList.APIResources = []metav1.APIResource{ + { + Name: EvictionSubresource, + Kind: EvictionKind, + }, + } + } + return genResponseWithJsonEncodedBody(resourceList) + case m.isFor("GET", "/nodes/node"): + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, test.node)}, nil + case m.isFor("GET", "/namespaces/default/replicationcontrollers/rc"): + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &test.rcs[0])}, nil + case m.isFor("GET", "/namespaces/default/daemonsets/ds"): + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &ds)}, nil + case m.isFor("GET", "/namespaces/default/jobs/job"): + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &job)}, nil + case m.isFor("GET", "/namespaces/default/replicasets/rs"): + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &test.replicaSets[0])}, nil + case m.isFor("GET", "/namespaces/default/pods/bar"): + return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: objBody(codec, &api.Pod{})}, nil + case m.isFor("GET", "/pods"): + values, err := url.ParseQuery(req.URL.RawQuery) + if err != nil { + t.Fatalf("%s: unexpected error: %v", test.description, err) + } + get_params := make(url.Values) + get_params["fieldSelector"] = []string{"spec.nodeName=node"} + if !reflect.DeepEqual(get_params, values) { + t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, get_params, values) + } + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.PodList{Items: test.pods})}, nil + case m.isFor("GET", "/replicationcontrollers"): + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.ReplicationControllerList{Items: test.rcs})}, nil + case m.isFor("PUT", "/nodes/node"): + data, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Fatalf("%s: unexpected error: %v", test.description, err) + } + defer req.Body.Close() + if err := runtime.DecodeInto(codec, data, new_node); err != nil { + t.Fatalf("%s: unexpected error: %v", test.description, err) + } + if !reflect.DeepEqual(test.expected.Spec, new_node.Spec) { + t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, test.expected.Spec, new_node.Spec) + } + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, new_node)}, nil + case m.isFor("DELETE", "/namespaces/default/pods/bar"): + deleted = true + return &http.Response{StatusCode: 204, Header: defaultHeader(), Body: objBody(codec, &test.pods[0])}, nil + case m.isFor("POST", "/namespaces/default/pods/bar/eviction"): + evicted = true + return &http.Response{StatusCode: 201, Header: defaultHeader(), Body: policyObjBody(&policy.Eviction{})}, nil + default: + t.Fatalf("%s: unexpected request: %v %#v\n%#v", test.description, req.Method, req.URL, req) + return nil, nil + } + }), + } + tf.ClientConfig = defaultClientConfig() + duration, _ := time.ParseDuration("0s") + cmd := &DrainOptions{ + factory: f, + backOff: clockwork.NewRealClock(), + Force: true, + IgnoreDaemonsets: true, + DeleteLocalData: true, + GracePeriodSeconds: -1, + Timeout: duration, + } + + saw_fatal := false + func() { + defer func() { + // Recover from the panic below. + _ = recover() + // Restore cmdutil behavior + cmdutil.DefaultBehaviorOnFatal() + }() + cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) }) + cmd.SetupDrain(node.Name) + cmd.RunDrain() + }() + + if test.expectFatal { + if !saw_fatal { + t.Fatalf("%s: unexpected non-error when using %s", test.description, currMethod) + } + } + + if test.expectDelete { + // Test Delete + if !testEviction && !deleted { + t.Fatalf("%s: pod never deleted", test.description) + } + // Test Eviction + if testEviction && !evicted { + t.Fatalf("%s: pod never evicted", test.description) + } + } + if !test.expectDelete { + if deleted { + t.Fatalf("%s: unexpected delete when using %s", test.description, currMethod) + } + } + } + } +} + +type MyReq struct { + Request *http.Request +} + +func (m *MyReq) isFor(method string, path string) bool { + req := m.Request + + return method == req.Method && (req.URL.Path == path || + req.URL.Path == strings.Join([]string{"/api/v1", path}, "") || + req.URL.Path == strings.Join([]string{"/apis/extensions/v1beta1", path}, "") || + req.URL.Path == strings.Join([]string{"/apis/batch/v1", path}, "")) +} + +func refJson(t *testing.T, o runtime.Object) string { + ref, err := api.GetReference(o) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + _, _, codec, _ := cmdtesting.NewAPIFactory() + json, err := runtime.Encode(codec, &api.SerializedReference{Reference: *ref}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + return string(json) +} + +func genResponseWithJsonEncodedBody(bodyStruct interface{}) (*http.Response, error) { + jsonBytes, err := json.Marshal(bodyStruct) + if err != nil { + return nil, err + } + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bytesBody(jsonBytes)}, nil +} + +func defaultHeader() http.Header { + header := http.Header{} + header.Set("Content-Type", runtime.ContentTypeJSON) + return header +} + +func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser { + return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj)))) +} + +func policyObjBody(obj runtime.Object) io.ReadCloser { + return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(testapi.Policy.Codec(), obj)))) +} + +func bytesBody(bodyBytes []byte) io.ReadCloser { + return ioutil.NopCloser(bytes.NewReader(bodyBytes)) +} + +func defaultClientConfig() *restclient.Config { + return &restclient.Config{ + APIPath: "/api", + ContentConfig: restclient.ContentConfig{ + NegotiatedSerializer: api.Codecs, + ContentType: runtime.ContentTypeJSON, + GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, + }, + } +} diff --git a/upup/pkg/kutil/rollingupdate_cluster.go b/upup/pkg/kutil/rollingupdate_cluster.go index 9815dbb2fd..9af25d1c59 100644 --- a/upup/pkg/kutil/rollingupdate_cluster.go +++ b/upup/pkg/kutil/rollingupdate_cluster.go @@ -25,6 +25,7 @@ import ( "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/aws/aws-sdk-go/service/ec2" "github.com/golang/glog" + "github.com/spf13/cobra" api "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/featureflag" "k8s.io/kops/pkg/validation" @@ -32,11 +33,10 @@ import ( "k8s.io/kops/upup/pkg/fi/cloudup/awsup" "k8s.io/kubernetes/pkg/api/v1" k8s_clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/kubectl/cmd" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" - "github.com/spf13/cobra" "os" - "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" ) // RollingUpdateCluster is a struct containing cluster information for a rolling update. @@ -50,7 +50,7 @@ type RollingUpdateCluster struct { Force bool K8sClient *k8s_clientset.Clientset - ClientConfig clientcmd.ClientConfig + ClientConfig clientcmd.ClientConfig FailOnDrainError bool FailOnValidate bool CloudOnly bool @@ -464,8 +464,6 @@ func (n *CloudInstanceGroup) DeleteAWSInstance(u *CloudInstanceGroupInstance, in } - - // DrainNode drains a K8s node. func (n *CloudInstanceGroup) DrainNode(u *CloudInstanceGroupInstance, rollingUpdateData *RollingUpdateCluster) error { if rollingUpdateData.ClientConfig == nil { @@ -476,12 +474,12 @@ func (n *CloudInstanceGroup) DrainNode(u *CloudInstanceGroupInstance, rollingUpd // TODO: Send out somewhere else, also DrainOptions has errout out := os.Stdout - options := &cmd.DrainOptions{factory: f, out: out} + options := &cmd.DrainOptions{Factory: f, Out: out} cmd := &cobra.Command{ - Use: "cordon NODE", + Use: "cordon NODE", } - args := []string{ u.Node.Name } + args := []string{u.Node.Name} err := options.SetupDrain(cmd, args) if err != nil { return fmt.Errorf("error setting up drain: %v", err) diff --git a/upup/pkg/kutil/rollingupdate_cluster_validate_drain.go b/upup/pkg/kutil/rollingupdate_cluster_validate_drain.go new file mode 100644 index 0000000000..0e984c035c --- /dev/null +++ b/upup/pkg/kutil/rollingupdate_cluster_validate_drain.go @@ -0,0 +1,319 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kutil + +// TODO move this business logic into a service than can be called via the api + +import ( + "fmt" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/golang/glog" + api "k8s.io/kops/pkg/apis/kops" + validate "k8s.io/kops/pkg/validation" + "k8s.io/kops/upup/pkg/fi" + "k8s.io/kops/upup/pkg/fi/cloudup/awsup" + k8s_clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" +) + +// RollingUpdateCluster restarts cluster nodes +type RollingUpdateClusterDV struct { + Cloud fi.Cloud + + MasterInterval time.Duration + NodeInterval time.Duration + BastionInterval time.Duration + K8sClient *k8s_clientset.Clientset + + ForceDrain bool + FailOnValidate bool + + Force bool + + CloudOnly bool + ClusterName string +} + +// RollingUpdateData is used to pass information to perform a rolling update +type RollingUpdateDataDV struct { + Cloud fi.Cloud + Force bool + Interval time.Duration + InstanceGroupList *api.InstanceGroupList + IsBastion bool + + K8sClient *k8s_clientset.Clientset + + ForceDrain bool + FailOnValidate bool + + CloudOnly bool + ClusterName string +} + +// TODO move retries to RollingUpdateCluster +const retries = 8 + +// TODO: should we check to see if api updates exist in the cluster +// TODO: for instance should we check if Petsets exist when upgrading 1.4.x -> 1.5.x + +// Perform a rolling update on a K8s Cluster +func (c *RollingUpdateClusterDV) RollingUpdateDrainValidate(groups map[string]*CloudInstanceGroup, instanceGroups *api.InstanceGroupList) error { + if len(groups) == 0 { + return nil + } + + var resultsMutex sync.Mutex + results := make(map[string]error) + + masterGroups := make(map[string]*CloudInstanceGroup) + nodeGroups := make(map[string]*CloudInstanceGroup) + bastionGroups := make(map[string]*CloudInstanceGroup) + for k, group := range groups { + switch group.InstanceGroup.Spec.Role { + case api.InstanceGroupRoleNode: + nodeGroups[k] = group + case api.InstanceGroupRoleMaster: + masterGroups[k] = group + case api.InstanceGroupRoleBastion: + bastionGroups[k] = group + default: + return fmt.Errorf("unknown group type for group %q", group.InstanceGroup.ObjectMeta.Name) + } + } + + // Upgrade bastions first; if these go down we can't see anything + { + var wg sync.WaitGroup + + for k, bastionGroup := range bastionGroups { + wg.Add(1) + go func(k string, group *CloudInstanceGroup) { + resultsMutex.Lock() + results[k] = fmt.Errorf("function panic") + resultsMutex.Unlock() + + defer wg.Done() + + rollingUpdateData := c.CreateRollingUpdateData(instanceGroups, true) + + err := group.RollingUpdateDV(rollingUpdateData) + + resultsMutex.Lock() + results[k] = err + resultsMutex.Unlock() + }(k, bastionGroup) + } + + wg.Wait() + } + + // Upgrade master next + { + var wg sync.WaitGroup + + // We run master nodes in series, even if they are in separate instance groups + // typically they will be in separate instance groups, so we can force the zones, + // and we don't want to roll all the masters at the same time. See issue #284 + wg.Add(1) + + go func() { + for k := range masterGroups { + resultsMutex.Lock() + results[k] = fmt.Errorf("function panic") + resultsMutex.Unlock() + } + + defer wg.Done() + + for k, group := range masterGroups { + rollingUpdateData := c.CreateRollingUpdateData(instanceGroups, false) + + err := group.RollingUpdateDV(rollingUpdateData) + resultsMutex.Lock() + results[k] = err + resultsMutex.Unlock() + + // TODO: Bail on error? + } + }() + + wg.Wait() + } + + // Upgrade nodes, with greater parallelism + // TODO increase each instancegroups nodes by one + { + var wg sync.WaitGroup + + for k, nodeGroup := range nodeGroups { + wg.Add(1) + go func(k string, group *CloudInstanceGroup) { + resultsMutex.Lock() + results[k] = fmt.Errorf("function panic") + resultsMutex.Unlock() + + defer wg.Done() + + rollingUpdateData := c.CreateRollingUpdateData(instanceGroups, false) + + err := group.RollingUpdateDV(rollingUpdateData) + + resultsMutex.Lock() + results[k] = err + resultsMutex.Unlock() + }(k, nodeGroup) + } + + wg.Wait() + } + + for _, err := range results { + if err != nil { + return err + } + } + + glog.Info("\nRolling update completed!\n") + return nil +} + +func (c *RollingUpdateClusterDV) CreateRollingUpdateData(instanceGroups *api.InstanceGroupList, isBastion bool) *RollingUpdateDataDV { + return &RollingUpdateDataDV{ + Cloud: c.Cloud, + Force: c.Force, + Interval: c.NodeInterval, + InstanceGroupList: instanceGroups, + IsBastion: isBastion, + K8sClient: c.K8sClient, + FailOnValidate: c.FailOnValidate, + ForceDrain: c.ForceDrain, + CloudOnly: c.CloudOnly, + ClusterName: c.ClusterName, + } +} + +// RollingUpdate performs a rolling update on a list of ec2 instances. +func (n *CloudInstanceGroup) RollingUpdateDV(rollingUpdateData *RollingUpdateDataDV) error { + + // we should not get here, but hey I am going to check + if rollingUpdateData == nil { + return fmt.Errorf("RollingUpdate cannot be nil") + } + + // Do not need a k8s client if you are doing cloud only + if rollingUpdateData.K8sClient == nil && !rollingUpdateData.CloudOnly { + return fmt.Errorf("RollingUpdate is missing a k8s client") + } + + if rollingUpdateData.InstanceGroupList == nil { + return fmt.Errorf("RollingUpdate is missing a the InstanceGroupList") + } + + c := rollingUpdateData.Cloud.(awsup.AWSCloud) + + update := n.NeedUpdate + if rollingUpdateData.Force { + update = append(update, n.Ready...) + } + + // TODO is this logic correct + if !rollingUpdateData.IsBastion && rollingUpdateData.FailOnValidate && !rollingUpdateData.CloudOnly { + _, err := validate.ValidateCluster(rollingUpdateData.ClusterName, rollingUpdateData.InstanceGroupList, rollingUpdateData.K8sClient) + if err != nil { + return fmt.Errorf("Cluster %s does not pass validation", rollingUpdateData.ClusterName) + } + } + + for _, u := range update { + + if !rollingUpdateData.IsBastion { + if rollingUpdateData.CloudOnly { + glog.Warningf("not draining nodes - cloud only is set") + } else { + drain, err := NewDrainOptions(nil, u.Node.ClusterName) + + if err != nil { + glog.Warningf("Error creating drain: %v", err) + if rollingUpdateData.ForceDrain == false { + return err + } + } else { + err = drain.DrainTheNode(u.Node.Name) + if err != nil { + glog.Warningf("setupErr: %v", err) + } + if rollingUpdateData.ForceDrain == false { + return err + } + } + } + } + + // TODO: Temporarily increase size of ASG? + // TODO: Remove from ASG first so status is immediately updated? + // TODO: Batch termination, like a rolling-update + // TODO: check if an asg is running the correct number of instances + + instanceID := aws.StringValue(u.ASGInstance.InstanceId) + glog.Infof("Stopping instance %q in AWS ASG %q", instanceID, n.ASGName) + + request := &ec2.TerminateInstancesInput{ + InstanceIds: []*string{u.ASGInstance.InstanceId}, + } + _, err := c.EC2().TerminateInstances(request) + if err != nil { + return fmt.Errorf("error deleting instance %q: %v", instanceID, err) + } + + if !rollingUpdateData.IsBastion { + // Wait for new EC2 instances to be created + time.Sleep(rollingUpdateData.Interval) + + // Wait until the cluster is happy + // TODO: do we need to respect cloud only?? + for i := 0; i <= retries; i++ { + + if rollingUpdateData.CloudOnly { + glog.Warningf("sleeping only - not validating nodes as cloudonly flag is set") + time.Sleep(rollingUpdateData.Interval) + } else { + _, err = validate.ValidateCluster(rollingUpdateData.ClusterName, rollingUpdateData.InstanceGroupList, rollingUpdateData.K8sClient) + if err != nil { + glog.Infof("Unable to validate k8s cluster: %s.", err) + time.Sleep(rollingUpdateData.Interval / 2) + } else { + glog.Info("Cluster validated proceeding with next step in rolling update") + break + } + } + } + + if rollingUpdateData.CloudOnly { + glog.Warningf("not validating nodes as cloudonly flag is set") + } else if err != nil && rollingUpdateData.FailOnValidate { + return fmt.Errorf("validation timed out while performing rolling update: %v", err) + } + } + + } + + return nil +}