Validate and drain with rolling update set via new feature flag DrainAndValidateRollingUpdate

This commit is contained in:
chrislovecnm 2016-12-11 19:35:53 -07:00
parent c2f2de93e3
commit b7b0bca1fc
6 changed files with 1596 additions and 11 deletions

View File

@ -23,9 +23,11 @@ import (
"strconv"
"time"
"github.com/golang/glog"
"github.com/spf13/cobra"
"k8s.io/kops/cmd/kops/util"
api "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/featureflag"
"k8s.io/kops/upup/pkg/fi/cloudup"
"k8s.io/kops/upup/pkg/kutil"
"k8s.io/kops/util/pkg/tables"
@ -40,6 +42,8 @@ type RollingUpdateOptions struct {
Force bool
CloudOnly bool
ForceDrain bool
FailOnValidate bool
MasterInterval time.Duration
NodeInterval time.Duration
BastionInterval time.Duration
@ -72,6 +76,9 @@ func NewCmdRollingUpdateCluster(f *util.Factory, out io.Writer) *cobra.Command {
This command updates the running instances to match the cloud specifications.
Use KOPS_FEATURE_FLAGS="+ValidiateAndDrainRollingUpdate" to use beta code that drains the nodes
and validates the cluser.
To perform rolling update, you need to update the cloud resources first with "kops update cluster"`,
}
@ -79,12 +86,14 @@ To perform rolling update, you need to update the cloud resources first with "ko
cmd.Flags().BoolVar(&options.Force, "force", options.Force, "Force rolling update, even if no changes")
cmd.Flags().BoolVar(&options.CloudOnly, "cloudonly", options.CloudOnly, "Perform rolling update without confirming progress with k8s")
cmd.Flags().DurationVar(&options.MasterInterval, "master-interval", options.MasterInterval, "Time to wait between restarting masters")
cmd.Flags().DurationVar(&options.NodeInterval, "node-interval", options.NodeInterval, "Time to wait between restarting nodes")
cmd.Flags().DurationVar(&options.BastionInterval, "bastion-interval", options.BastionInterval, "Time to wait between restarting bastions")
cmd.Flags().DurationVar(&options.MasterInterval, "master-interval", 5*time.Minute, "Time to wait between restarting masters")
cmd.Flags().DurationVar(&options.NodeInterval, "node-interval", 2*time.Minute, "Time to wait between restarting nodes")
cmd.Flags().DurationVar(&options.BastionInterval, "bastion-interval", 5*time.Minute, "Time to wait between restarting bastions")
cmd.Flags().StringSliceVar(&options.InstanceGroups, "instance-group", options.InstanceGroups, "List of instance groups to update (defaults to all if not specified)")
cmd.Flags().BoolVar(&options.ForceDrain, "force-drain", true, "The node will be upgraded if the drain fails, if set to false the rolling update will fail if a drain fails.")
cmd.Flags().BoolVar(&options.FailOnValidate, "validate", true, "Validate the cluster, and if the validation fails stop the rolling-update.")
cmd.Run = func(cmd *cobra.Command, args []string) {
err := rootCommand.ProcessArgs(args)
if err != nil {
@ -264,5 +273,27 @@ func RunRollingUpdateCluster(f *util.Factory, out io.Writer, options *RollingUpd
return nil
}
return d.RollingUpdate(groups, k8sClient)
if featureflag.DrainAndValidateRollingUpdate.Enabled() {
d := &kutil.RollingUpdateClusterDV{
MasterInterval: options.MasterInterval,
NodeInterval: options.NodeInterval,
Force: options.Force,
K8sClient: k8sClient,
ForceDrain: options.ForceDrain,
FailOnValidate: options.FailOnValidate,
CloudOnly: options.CloudOnly,
ClusterName: options.ClusterName,
Cloud: cloud,
}
glog.V(2).Infof("New rolling update with drain and validate enabled")
return d.RollingUpdateDrainValidate(groups, list)
} else {
d := &kutil.RollingUpdateCluster{
MasterInterval: options.MasterInterval,
NodeInterval: options.NodeInterval,
Force: options.Force,
Cloud: cloud,
}
return d.RollingUpdate(groups, k8sClient)
}
}

View File

@ -38,6 +38,9 @@ func Bool(b bool) *bool {
// DNSPreCreate controls whether we pre-create DNS records.
var DNSPreCreate = New("DNSPreCreate", Bool(true))
// if set will use new rolling update code that will drain and validate
var DrainAndValidateRollingUpdate = New("DrainAndValidateRollingUpdate", Bool(true))
// VPCSkipEnableDNSSupport if set will make that a VPC does not need DNSSupport enabled
var VPCSkipEnableDNSSupport = New("VPCSkipEnableDNSSupport", Bool(false))

674
upup/pkg/kutil/drain.go Normal file
View File

@ -0,0 +1,674 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kutil
////
// Based off of drain in kubectl
// https://github.com/kubernetes/kubernetes/blob/master/pkg/kubectl/cmd/drain.go
////
// TODO: remove kubectl dependencies
// TODO: can we use our own client instead of building it again
// TODO: refactor our client to be like this client
// FIXME: look at 1.5 refactor
// FIXME: we are deleting local storage for daemon sets, and why even delete local storage??
import (
"errors"
"fmt"
"math"
"reflect"
"strings"
"time"
"github.com/jonboulle/clockwork"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/policy"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubectl"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
// DrainOptions For Draining Node
type DrainOptions struct {
client *internalclientset.Clientset
restClient *restclient.RESTClient
factory cmdutil.Factory
Force bool
GracePeriodSeconds int
IgnoreDaemonsets bool
Timeout time.Duration
backOff clockwork.Clock
DeleteLocalData bool
mapper meta.RESTMapper
nodeInfo *resource.Info
typer runtime.ObjectTyper
}
// Allow tweaking default options for draining nodes
type DrainCommand struct {
Force bool
IgnoreDaemonsets bool
DeleteLocalData bool
GracePeriodSeconds int
Timeout int
}
// Takes a pod and returns a bool indicating whether or not to operate on the
// pod, an optional warning message, and an optional fatal error.
type podFilter func(api.Pod) (include bool, w *warning, f *fatal)
type warning struct {
string
}
type fatal struct {
string
}
const (
EvictionKind = "Eviction"
EvictionSubresource = "pods/eviction"
kDaemonsetFatal = "DaemonSet-managed pods (use --ignore-daemonsets to ignore)"
kDaemonsetWarning = "Ignoring DaemonSet-managed pods"
kLocalStorageFatal = "pods with local storage (use --delete-local-data to override)"
kLocalStorageWarning = "Deleting pods with local storage"
kUnmanagedFatal = "pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet (use --force to override)"
kUnmanagedWarning = "Deleting pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet"
kMaxNodeUpdateRetry = 10
)
// Create a NewDrainOptions
func NewDrainOptions(command *DrainCommand, clusterName string) (*DrainOptions, error) {
config := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{CurrentContext: clusterName})
f := cmdutil.NewFactory(config)
if command != nil {
duration, err := time.ParseDuration(fmt.Sprintf("%ds", command.GracePeriodSeconds))
if err != nil {
return nil, err
}
return &DrainOptions{
factory: f,
backOff: clockwork.NewRealClock(),
Force: command.Force,
IgnoreDaemonsets: command.IgnoreDaemonsets,
DeleteLocalData: command.DeleteLocalData,
GracePeriodSeconds: command.GracePeriodSeconds,
Timeout: duration,
}, nil
}
// return will defaults
duration, err := time.ParseDuration("0s")
if err != nil {
return nil, err
}
return &DrainOptions{
factory: f,
backOff: clockwork.NewRealClock(),
Force: true,
IgnoreDaemonsets: true,
DeleteLocalData: false, // TODO: should this be true?
GracePeriodSeconds: -1,
Timeout: duration,
}, nil
}
func (o *DrainOptions) DrainTheNode(nodeName string) (err error) {
err = o.SetupDrain(nodeName)
if err != nil {
return fmt.Errorf("Error setting up the drain: %v, node: %s", err, nodeName)
}
err = o.RunDrain()
if err != nil {
return fmt.Errorf("Drain failed %v, %s", err, nodeName)
}
return nil
}
// SetupDrain populates some fields from the factory, grabs command line
// arguments and looks up the node using Builder
func (o *DrainOptions) SetupDrain(nodeName string) error {
if nodeName == "" {
return fmt.Errorf("nodeName cannot be empty")
}
var err error
if o.client, err = o.factory.ClientSet(); err != nil {
return fmt.Errorf("client or clientset nil %v", err)
}
o.restClient, err = o.factory.RESTClient()
if err != nil {
return fmt.Errorf("rest client problem %v", err)
}
o.mapper, o.typer = o.factory.Object()
cmdNamespace, _, err := o.factory.DefaultNamespace()
if err != nil {
return fmt.Errorf("DefaultNamespace problem %v", err)
}
r := o.factory.NewBuilder().
NamespaceParam(cmdNamespace).DefaultNamespace().
ResourceNames("node", nodeName).
Do()
if err = r.Err(); err != nil {
return fmt.Errorf("NewBuilder problem %v", err)
}
err = r.Visit(func(info *resource.Info, err error) error {
if err != nil {
return fmt.Errorf("internal vistor problem %v", err)
}
glog.V(2).Infof("info %v", info)
o.nodeInfo = info
return nil
})
if err != nil {
glog.Fatalf("Error getting nodeInfo %v", err)
return fmt.Errorf("vistor problem %v", err)
}
if err = r.Err(); err != nil {
return fmt.Errorf("vistor problem %v", err)
}
return nil
}
// RunDrain runs the 'drain' command
func (o *DrainOptions) RunDrain() error {
if o.nodeInfo == nil {
return fmt.Errorf("nodeInfo is not setup")
}
if err := o.RunCordonOrUncordon(true); err != nil {
glog.V(2).Infof("Error cordon node %v - %v", o.nodeInfo.Name, err)
return err
}
err := o.deleteOrEvictPodsSimple()
if err == nil {
glog.V(2).Infof("Drained node %s", o.nodeInfo.Name)
} else {
glog.V(2).Infof("Error draining node %s - %v", o.nodeInfo.Name, err)
}
return err
}
func (o *DrainOptions) deleteOrEvictPodsSimple() error {
pods, err := o.getPodsForDeletion()
if err != nil {
return err
}
err = o.deleteOrEvictPods(pods)
if err != nil {
pendingPods, newErr := o.getPodsForDeletion()
if newErr != nil {
return newErr
}
glog.Fatalf("There are pending pods when an error occurred: %v\n", err)
for _, pendingPod := range pendingPods {
glog.Fatalf("%s/%s\n", "pod", pendingPod.Name)
}
}
return err
}
func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, error) {
switch sr.Reference.Kind {
case "ReplicationController":
return o.client.Core().ReplicationControllers(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
case "DaemonSet":
return o.client.Extensions().DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
case "Job":
return o.client.Batch().Jobs(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
case "ReplicaSet":
return o.client.Extensions().ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
case "PetSet":
// FIXME: how the heck do you write this
// FIXME: Can we use the go client to make 1.4 and 1.5 calls :)
return "PetSet", nil
case "StatefulSet":
return o.client.Apps().StatefulSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
}
return nil, fmt.Errorf("unknown controller kind %q", sr.Reference.Kind)
}
func (o *DrainOptions) getPodCreator(pod api.Pod) (*api.SerializedReference, error) {
creatorRef, found := pod.ObjectMeta.Annotations[api.CreatedByAnnotation]
if !found {
return nil, nil
}
// Now verify that the specified creator actually exists.
sr := &api.SerializedReference{}
if err := runtime.DecodeInto(o.factory.Decoder(true), []byte(creatorRef), sr); err != nil {
return nil, err
}
// We assume the only reason for an error is because the controller is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
_, err := o.getController(sr)
if err != nil {
return nil, err
}
return sr, nil
}
func (o *DrainOptions) unreplicatedFilter(pod api.Pod) (bool, *warning, *fatal) {
// any finished pod can be removed
if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed {
return true, nil, nil
}
sr, err := o.getPodCreator(pod)
if err != nil {
return false, nil, &fatal{err.Error()}
}
if sr != nil {
return true, nil, nil
}
if !o.Force {
return false, nil, &fatal{kUnmanagedFatal}
}
return true, &warning{kUnmanagedWarning}, nil
}
func (o *DrainOptions) daemonsetFilter(pod api.Pod) (bool, *warning, *fatal) {
// Note that we return false in all cases where the pod is DaemonSet managed,
// regardless of flags. We never delete them, the only question is whether
// their presence constitutes an error.
sr, err := o.getPodCreator(pod)
if err != nil {
return false, nil, &fatal{err.Error()}
}
if sr == nil || sr.Reference.Kind != "DaemonSet" {
return true, nil, nil
}
if _, err := o.client.Extensions().DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{}); err != nil {
return false, nil, &fatal{err.Error()}
}
if !o.IgnoreDaemonsets {
return false, nil, &fatal{kDaemonsetFatal}
}
return false, &warning{kDaemonsetWarning}, nil
}
func mirrorPodFilter(pod api.Pod) (bool, *warning, *fatal) {
if _, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey]; found {
return false, nil, nil
}
return true, nil, nil
}
func hasLocalStorage(pod api.Pod) bool {
for _, volume := range pod.Spec.Volumes {
if volume.EmptyDir != nil {
return true
}
}
return false
}
func (o *DrainOptions) localStorageFilter(pod api.Pod) (bool, *warning, *fatal) {
if !hasLocalStorage(pod) {
return true, nil, nil
}
if !o.DeleteLocalData {
return false, nil, &fatal{kLocalStorageFatal}
}
return true, &warning{kLocalStorageWarning}, nil
}
// Map of status message to a list of pod names having that status.
type podStatuses map[string][]string
func (ps podStatuses) Message() string {
msgs := []string{}
for key, pods := range ps {
msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", ")))
}
return strings.Join(msgs, "; ")
}
// getPodsForDeletion returns all the pods we're going to delete. If there are
// any pods preventing us from deleting, we return that list in an error.
func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) {
podList, err := o.client.Core().Pods(api.NamespaceAll).List(api.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeInfo.Name})})
if err != nil {
return pods, err
}
ws := podStatuses{}
fs := podStatuses{}
for _, pod := range podList.Items {
podOk := true
// FIXME: The localStorageFilter is coming back with daemonsets
// FIXME: The filters are not excluding each other
for _, filt := range []podFilter{mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter, o.daemonsetFilter} {
filterOk, w, f := filt(pod)
podOk = podOk && filterOk
if w != nil {
ws[w.string] = append(ws[w.string], pod.Name)
}
if f != nil {
fs[f.string] = append(fs[f.string], pod.Name)
}
}
if podOk {
pods = append(pods, pod)
}
}
if len(fs) > 0 {
return []api.Pod{}, errors.New(fs.Message())
}
if len(ws) > 0 {
glog.Warningf("WARNING: %s\n", ws.Message())
}
glog.V(2).Infof("Pods to delete: %v", pods)
return pods, nil
}
func (o *DrainOptions) deletePod(pod api.Pod) error {
deleteOptions := &api.DeleteOptions{}
if o.GracePeriodSeconds >= 0 {
gracePeriodSeconds := int64(o.GracePeriodSeconds)
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
}
return o.client.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
}
func (o *DrainOptions) evictPod(pod api.Pod, policyGroupVersion string) error {
deleteOptions := &api.DeleteOptions{}
if o.GracePeriodSeconds >= 0 {
gracePeriodSeconds := int64(o.GracePeriodSeconds)
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
}
eviction := &policy.Eviction{
TypeMeta: metav1.TypeMeta{
APIVersion: policyGroupVersion,
Kind: EvictionKind,
},
ObjectMeta: api.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
DeleteOptions: deleteOptions,
}
// Remember to change change the URL manipulation func when Evction's version change
return o.client.Policy().Evictions(eviction.Namespace).Evict(eviction)
}
// deleteOrEvictPods deletes or evicts the pods on the api server
func (o *DrainOptions) deleteOrEvictPods(pods []api.Pod) error {
if len(pods) == 0 {
return nil
}
policyGroupVersion, err := SupportEviction(o.client)
if err != nil {
return fmt.Errorf("error deleteOrEvictPods ~ SupportEviction: %v", err)
}
getPodFn := func(namespace, name string) (*api.Pod, error) {
return o.client.Core().Pods(namespace).Get(name, metav1.GetOptions{})
}
if len(policyGroupVersion) > 0 {
err = o.evictPods(pods, policyGroupVersion, getPodFn)
if err != nil {
glog.Warningf("Error attempting to evict pod, will delete pod - err: %v", err)
return o.deletePods(pods, getPodFn)
}
return nil
} else {
return o.deletePods(pods, getPodFn)
}
}
func (o *DrainOptions) evictPods(pods []api.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*api.Pod, error)) error {
doneCh := make(chan bool, len(pods))
errCh := make(chan error, 1)
for _, pod := range pods {
go func(pod api.Pod, doneCh chan bool, errCh chan error) {
var err error
for {
err = o.evictPod(pod, policyGroupVersion)
if err == nil {
break
} else if apierrors.IsTooManyRequests(err) {
time.Sleep(5 * time.Second)
} else {
errCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
return
}
}
podArray := []api.Pod{pod}
_, err = o.waitForDelete(podArray, kubectl.Interval, time.Duration(math.MaxInt64), true, getPodFn)
if err == nil {
doneCh <- true
} else {
errCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
}
}(pod, doneCh, errCh)
}
doneCount := 0
// 0 timeout means infinite, we use MaxInt64 to represent it.
var globalTimeout time.Duration
if o.Timeout == 0 {
globalTimeout = time.Duration(math.MaxInt64)
} else {
globalTimeout = o.Timeout
}
for {
select {
case err := <-errCh:
return err
case <-doneCh:
doneCount++
if doneCount == len(pods) {
return nil
}
case <-time.After(globalTimeout):
return fmt.Errorf("drain did not complete within %v", globalTimeout)
}
}
}
func (o *DrainOptions) deletePods(pods []api.Pod, getPodFn func(namespace, name string) (*api.Pod, error)) error {
// 0 timeout means infinite, we use MaxInt64 to represent it.
var globalTimeout time.Duration
if o.Timeout == 0 {
globalTimeout = time.Duration(math.MaxInt64)
} else {
globalTimeout = o.Timeout
}
for _, pod := range pods {
err := o.deletePod(pod)
if err != nil {
return err
}
}
_, err := o.waitForDelete(pods, kubectl.Interval, globalTimeout, false, getPodFn)
return err
}
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*api.Pod, error)) ([]api.Pod, error) {
var verbStr string
if usingEviction {
verbStr = "evicted"
} else {
verbStr = "deleted"
}
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
pendingPods := []api.Pod{}
for i, pod := range pods {
p, err := getPodFn(pod.Namespace, pod.Name)
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
glog.V(2).Infof("Deleted pod %s, %s", pod.Name, verbStr)
continue
} else if err != nil {
return false, err
} else {
pendingPods = append(pendingPods, pods[i])
}
}
pods = pendingPods
if len(pendingPods) > 0 {
return false, nil
}
return true, nil
})
return pods, err
}
// SupportEviction uses Discovery API to find out if the server support eviction subresource
// If support, it will return its groupVersion; Otherwise, it will return ""
func SupportEviction(clientset *internalclientset.Clientset) (string, error) {
discoveryClient := clientset.Discovery()
groupList, err := discoveryClient.ServerGroups()
if err != nil {
return "", err
}
foundPolicyGroup := false
var policyGroupVersion string
for _, group := range groupList.Groups {
if group.Name == "policy" {
foundPolicyGroup = true
policyGroupVersion = group.PreferredVersion.GroupVersion
break
}
}
if !foundPolicyGroup {
return "", nil
}
resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1")
if err != nil {
return "", err
}
for _, resource := range resourceList.APIResources {
if resource.Name == EvictionSubresource && resource.Kind == EvictionKind {
return policyGroupVersion, nil
}
}
return "", nil
}
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
// "Unschedulable" is passed as the first arg.
func (o *DrainOptions) RunCordonOrUncordon(desired bool) error {
cmdNamespace, _, err := o.factory.DefaultNamespace()
if err != nil {
glog.V(2).Infof("Error node %s - %v", o.nodeInfo.Name, err)
return err
}
if o.nodeInfo == nil {
return fmt.Errorf("nodeInfo nil")
}
if o.nodeInfo.Mapping == nil {
return fmt.Errorf("Mapping nil")
}
if o.nodeInfo.Mapping.GroupVersionKind.Kind == "Node" {
unsched := reflect.ValueOf(o.nodeInfo.Object).Elem().FieldByName("Spec").FieldByName("Unschedulable")
if unsched.Bool() == desired {
glog.V(2).Infof("Node is already: %s", already(desired))
} else {
helper := resource.NewHelper(o.restClient, o.nodeInfo.Mapping)
unsched.SetBool(desired)
var err error
for i := 0; i < kMaxNodeUpdateRetry; i++ {
// We don't care about what previous versions may exist, we always want
// to overwrite, and Replace always sets current ResourceVersion if version is "".
helper.Versioner.SetResourceVersion(o.nodeInfo.Object, "")
_, err = helper.Replace(cmdNamespace, o.nodeInfo.Name, true, o.nodeInfo.Object)
if err != nil {
if !apierrors.IsConflict(err) {
return err
}
} else {
break
}
// It's a race, no need to sleep
}
if err != nil {
return err
}
glog.V(2).Infof("Node %s is : %s", o.nodeInfo.Name, changed(desired))
}
} else {
glog.V(2).Infof("Node %s is : skipped", o.nodeInfo.Name)
}
return nil
}
// already() and changed() return suitable strings for {un,}cordoning
func already(desired bool) string {
if desired {
return "already cordoned"
}
return "already uncordoned"
}
func changed(desired bool) string {
if desired {
return "cordoned"
}
return "uncordoned"
}

View File

@ -0,0 +1,563 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
////
// Based off of drain in kubectl
// https://github.com/kubernetes/kubernetes/blob/master/pkg/kubectl/cmd/drain_test.go
///
////
// TODO: implement negative test cases that are commented out
////
package kutil
import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"reflect"
"strings"
"testing"
"time"
"github.com/jonboulle/clockwork"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/policy"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/restclient/fake"
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/runtime"
)
const (
EvictionMethod = "Eviction"
DeleteMethod = "Delete"
)
var node *api.Node
var cordoned_node *api.Node
func TestMain(m *testing.M) {
// Create a node.
node = &api.Node{
ObjectMeta: api.ObjectMeta{
Name: "node",
CreationTimestamp: metav1.Time{Time: time.Now()},
},
Spec: api.NodeSpec{
ExternalID: "node",
},
Status: api.NodeStatus{},
}
clone, _ := api.Scheme.DeepCopy(node)
// A copy of the same node, but cordoned.
cordoned_node = clone.(*api.Node)
cordoned_node.Spec.Unschedulable = true
os.Exit(m.Run())
}
func TestDrain(t *testing.T) {
labels := make(map[string]string)
labels["my_key"] = "my_value"
rc := api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "rc",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
Labels: labels,
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
},
Spec: api.ReplicationControllerSpec{
Selector: labels,
},
}
rc_anno := make(map[string]string)
rc_anno[api.CreatedByAnnotation] = refJson(t, &rc)
rc_pod := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
Labels: labels,
Annotations: rc_anno,
},
Spec: api.PodSpec{
NodeName: "node",
},
}
ds := extensions.DaemonSet{
ObjectMeta: api.ObjectMeta{
Name: "ds",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
SelfLink: "/apis/extensions/v1beta1/namespaces/default/daemonsets/ds",
},
Spec: extensions.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: labels},
},
}
ds_anno := make(map[string]string)
ds_anno[api.CreatedByAnnotation] = refJson(t, &ds)
ds_pod := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
Labels: labels,
Annotations: ds_anno,
},
Spec: api.PodSpec{
NodeName: "node",
},
}
job := batch.Job{
ObjectMeta: api.ObjectMeta{
Name: "job",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
SelfLink: "/apis/extensions/v1beta1/namespaces/default/jobs/job",
},
Spec: batch.JobSpec{
Selector: &metav1.LabelSelector{MatchLabels: labels},
},
}
/*
// keeping dead code, because I need to fix this for 1.5 & 1.4
job_pod := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
Labels: labels,
Annotations: map[string]string{api.CreatedByAnnotation: refJson(t, &job)},
},
}
*/
rs := extensions.ReplicaSet{
ObjectMeta: api.ObjectMeta{
Name: "rs",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
Labels: labels,
SelfLink: testapi.Default.SelfLink("replicasets", "rs"),
},
Spec: extensions.ReplicaSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: labels},
},
}
rs_anno := make(map[string]string)
rs_anno[api.CreatedByAnnotation] = refJson(t, &rs)
rs_pod := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
Labels: labels,
Annotations: rs_anno,
},
Spec: api.PodSpec{
NodeName: "node",
},
}
naked_pod := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
Labels: labels,
},
Spec: api.PodSpec{
NodeName: "node",
},
}
emptydir_pod := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: metav1.Time{Time: time.Now()},
Labels: labels,
},
Spec: api.PodSpec{
NodeName: "node",
Volumes: []api.Volume{
{
Name: "scratch",
VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{Medium: ""}},
},
},
},
}
tests := []struct {
description string
node *api.Node
expected *api.Node
pods []api.Pod
rcs []api.ReplicationController
replicaSets []extensions.ReplicaSet
args []string
expectFatal bool
expectDelete bool
}{
{
description: "RC-managed pod",
node: node,
expected: cordoned_node,
pods: []api.Pod{rc_pod},
rcs: []api.ReplicationController{rc},
args: []string{"node"},
expectFatal: false,
expectDelete: true,
},
// TODO implement a way to init with correct params
/*
{
description: "DS-managed pod",
node: node,
expected: cordoned_node,
pods: []api.Pod{ds_pod},
rcs: []api.ReplicationController{rc},
args: []string{"node"},
expectFatal: true,
expectDelete: false,
},
*/
{
description: "DS-managed pod with --ignore-daemonsets",
node: node,
expected: cordoned_node,
pods: []api.Pod{ds_pod},
rcs: []api.ReplicationController{rc},
args: []string{"node", "--ignore-daemonsets"},
expectFatal: false,
expectDelete: false,
},
/*
// FIXME I am getting -test.v -test.run ^TestDrain$ drain_test.go:483: Job-managed pod: pod never evicted
{
description: "Job-managed pod",
node: node,
expected: cordoned_node,
pods: []api.Pod{job_pod},
rcs: []api.ReplicationController{rc},
args: []string{"node"},
expectFatal: false,
expectDelete: true,
},*/
{
description: "RS-managed pod",
node: node,
expected: cordoned_node,
pods: []api.Pod{rs_pod},
replicaSets: []extensions.ReplicaSet{rs},
args: []string{"node"},
expectFatal: false,
expectDelete: true,
},
// TODO implement a way to init with correct params
/*
{
description: "naked pod",
node: node,
expected: cordoned_node,
pods: []api.Pod{naked_pod},
rcs: []api.ReplicationController{},
args: []string{"node"},
expectFatal: true,
expectDelete: false,
},*/
{
description: "naked pod with --force",
node: node,
expected: cordoned_node,
pods: []api.Pod{naked_pod},
rcs: []api.ReplicationController{},
args: []string{"node", "--force"},
expectFatal: false,
expectDelete: true,
},
// TODO implement a way to init with correct params
/*
{
description: "pod with EmptyDir",
node: node,
expected: cordoned_node,
pods: []api.Pod{emptydir_pod},
args: []string{"node", "--force"},
expectFatal: true,
expectDelete: false,
},*/
{
description: "pod with EmptyDir and --delete-local-data",
node: node,
expected: cordoned_node,
pods: []api.Pod{emptydir_pod},
args: []string{"node", "--force", "--delete-local-data=true"},
expectFatal: false,
expectDelete: true,
},
{
description: "empty node",
node: node,
expected: cordoned_node,
pods: []api.Pod{},
rcs: []api.ReplicationController{rc},
args: []string{"node"},
expectFatal: false,
expectDelete: false,
},
}
testEviction := false
for i := 0; i < 2; i++ {
testEviction = !testEviction
var currMethod string
if testEviction {
currMethod = EvictionMethod
} else {
currMethod = DeleteMethod
}
for _, test := range tests {
new_node := &api.Node{}
deleted := false
evicted := false
f, tf, codec, ns := cmdtesting.NewAPIFactory()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
m := &MyReq{req}
switch {
case req.Method == "GET" && req.URL.Path == "/api":
apiVersions := metav1.APIVersions{
Versions: []string{"v1"},
}
return genResponseWithJsonEncodedBody(apiVersions)
case req.Method == "GET" && req.URL.Path == "/apis":
groupList := metav1.APIGroupList{
Groups: []metav1.APIGroup{
{
Name: "policy",
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: "policy/v1beta1",
},
},
},
}
return genResponseWithJsonEncodedBody(groupList)
case req.Method == "GET" && req.URL.Path == "/api/v1":
resourceList := metav1.APIResourceList{
GroupVersion: "v1",
}
if testEviction {
resourceList.APIResources = []metav1.APIResource{
{
Name: EvictionSubresource,
Kind: EvictionKind,
},
}
}
return genResponseWithJsonEncodedBody(resourceList)
case m.isFor("GET", "/nodes/node"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, test.node)}, nil
case m.isFor("GET", "/namespaces/default/replicationcontrollers/rc"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &test.rcs[0])}, nil
case m.isFor("GET", "/namespaces/default/daemonsets/ds"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &ds)}, nil
case m.isFor("GET", "/namespaces/default/jobs/job"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &job)}, nil
case m.isFor("GET", "/namespaces/default/replicasets/rs"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &test.replicaSets[0])}, nil
case m.isFor("GET", "/namespaces/default/pods/bar"):
return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: objBody(codec, &api.Pod{})}, nil
case m.isFor("GET", "/pods"):
values, err := url.ParseQuery(req.URL.RawQuery)
if err != nil {
t.Fatalf("%s: unexpected error: %v", test.description, err)
}
get_params := make(url.Values)
get_params["fieldSelector"] = []string{"spec.nodeName=node"}
if !reflect.DeepEqual(get_params, values) {
t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, get_params, values)
}
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.PodList{Items: test.pods})}, nil
case m.isFor("GET", "/replicationcontrollers"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.ReplicationControllerList{Items: test.rcs})}, nil
case m.isFor("PUT", "/nodes/node"):
data, err := ioutil.ReadAll(req.Body)
if err != nil {
t.Fatalf("%s: unexpected error: %v", test.description, err)
}
defer req.Body.Close()
if err := runtime.DecodeInto(codec, data, new_node); err != nil {
t.Fatalf("%s: unexpected error: %v", test.description, err)
}
if !reflect.DeepEqual(test.expected.Spec, new_node.Spec) {
t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, test.expected.Spec, new_node.Spec)
}
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, new_node)}, nil
case m.isFor("DELETE", "/namespaces/default/pods/bar"):
deleted = true
return &http.Response{StatusCode: 204, Header: defaultHeader(), Body: objBody(codec, &test.pods[0])}, nil
case m.isFor("POST", "/namespaces/default/pods/bar/eviction"):
evicted = true
return &http.Response{StatusCode: 201, Header: defaultHeader(), Body: policyObjBody(&policy.Eviction{})}, nil
default:
t.Fatalf("%s: unexpected request: %v %#v\n%#v", test.description, req.Method, req.URL, req)
return nil, nil
}
}),
}
tf.ClientConfig = defaultClientConfig()
duration, _ := time.ParseDuration("0s")
cmd := &DrainOptions{
factory: f,
backOff: clockwork.NewRealClock(),
Force: true,
IgnoreDaemonsets: true,
DeleteLocalData: true,
GracePeriodSeconds: -1,
Timeout: duration,
}
saw_fatal := false
func() {
defer func() {
// Recover from the panic below.
_ = recover()
// Restore cmdutil behavior
cmdutil.DefaultBehaviorOnFatal()
}()
cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) })
cmd.SetupDrain(node.Name)
cmd.RunDrain()
}()
if test.expectFatal {
if !saw_fatal {
t.Fatalf("%s: unexpected non-error when using %s", test.description, currMethod)
}
}
if test.expectDelete {
// Test Delete
if !testEviction && !deleted {
t.Fatalf("%s: pod never deleted", test.description)
}
// Test Eviction
if testEviction && !evicted {
t.Fatalf("%s: pod never evicted", test.description)
}
}
if !test.expectDelete {
if deleted {
t.Fatalf("%s: unexpected delete when using %s", test.description, currMethod)
}
}
}
}
}
type MyReq struct {
Request *http.Request
}
func (m *MyReq) isFor(method string, path string) bool {
req := m.Request
return method == req.Method && (req.URL.Path == path ||
req.URL.Path == strings.Join([]string{"/api/v1", path}, "") ||
req.URL.Path == strings.Join([]string{"/apis/extensions/v1beta1", path}, "") ||
req.URL.Path == strings.Join([]string{"/apis/batch/v1", path}, ""))
}
func refJson(t *testing.T, o runtime.Object) string {
ref, err := api.GetReference(o)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
_, _, codec, _ := cmdtesting.NewAPIFactory()
json, err := runtime.Encode(codec, &api.SerializedReference{Reference: *ref})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
return string(json)
}
func genResponseWithJsonEncodedBody(bodyStruct interface{}) (*http.Response, error) {
jsonBytes, err := json.Marshal(bodyStruct)
if err != nil {
return nil, err
}
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bytesBody(jsonBytes)}, nil
}
func defaultHeader() http.Header {
header := http.Header{}
header.Set("Content-Type", runtime.ContentTypeJSON)
return header
}
func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj))))
}
func policyObjBody(obj runtime.Object) io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(testapi.Policy.Codec(), obj))))
}
func bytesBody(bodyBytes []byte) io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader(bodyBytes))
}
func defaultClientConfig() *restclient.Config {
return &restclient.Config{
APIPath: "/api",
ContentConfig: restclient.ContentConfig{
NegotiatedSerializer: api.Codecs,
ContentType: runtime.ContentTypeJSON,
GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion,
},
}
}

View File

@ -276,16 +276,11 @@ func (n *CloudInstanceGroup) RollingUpdate(cloud fi.Cloud, force bool, interval
if force {
update = append(update, n.Ready...)
}
for _, u := range update {
instanceID := aws.StringValue(u.ASGInstance.InstanceId)
glog.Infof("Stopping instance %q in AWS ASG %q", instanceID, n.ASGName)
// TODO: Evacuate through k8s first?
// TODO: Temporarily increase size of ASG?
// TODO: Remove from ASG first so status is immediately updated?
// TODO: Batch termination, like a rolling-update
request := &ec2.TerminateInstancesInput{

View File

@ -0,0 +1,319 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kutil
// TODO move this business logic into a service than can be called via the api
import (
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/golang/glog"
api "k8s.io/kops/pkg/apis/kops"
validate "k8s.io/kops/pkg/validation"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
k8s_clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
)
// RollingUpdateCluster restarts cluster nodes
type RollingUpdateClusterDV struct {
Cloud fi.Cloud
MasterInterval time.Duration
NodeInterval time.Duration
BastionInterval time.Duration
K8sClient *k8s_clientset.Clientset
ForceDrain bool
FailOnValidate bool
Force bool
CloudOnly bool
ClusterName string
}
// RollingUpdateData is used to pass information to perform a rolling update
type RollingUpdateDataDV struct {
Cloud fi.Cloud
Force bool
Interval time.Duration
InstanceGroupList *api.InstanceGroupList
IsBastion bool
K8sClient *k8s_clientset.Clientset
ForceDrain bool
FailOnValidate bool
CloudOnly bool
ClusterName string
}
// TODO move retries to RollingUpdateCluster
const retries = 8
// TODO: should we check to see if api updates exist in the cluster
// TODO: for instance should we check if Petsets exist when upgrading 1.4.x -> 1.5.x
// Perform a rolling update on a K8s Cluster
func (c *RollingUpdateClusterDV) RollingUpdateDrainValidate(groups map[string]*CloudInstanceGroup, instanceGroups *api.InstanceGroupList) error {
if len(groups) == 0 {
return nil
}
var resultsMutex sync.Mutex
results := make(map[string]error)
masterGroups := make(map[string]*CloudInstanceGroup)
nodeGroups := make(map[string]*CloudInstanceGroup)
bastionGroups := make(map[string]*CloudInstanceGroup)
for k, group := range groups {
switch group.InstanceGroup.Spec.Role {
case api.InstanceGroupRoleNode:
nodeGroups[k] = group
case api.InstanceGroupRoleMaster:
masterGroups[k] = group
case api.InstanceGroupRoleBastion:
bastionGroups[k] = group
default:
return fmt.Errorf("unknown group type for group %q", group.InstanceGroup.ObjectMeta.Name)
}
}
// Upgrade bastions first; if these go down we can't see anything
{
var wg sync.WaitGroup
for k, bastionGroup := range bastionGroups {
wg.Add(1)
go func(k string, group *CloudInstanceGroup) {
resultsMutex.Lock()
results[k] = fmt.Errorf("function panic")
resultsMutex.Unlock()
defer wg.Done()
rollingUpdateData := c.CreateRollingUpdateData(instanceGroups, true)
err := group.RollingUpdateDV(rollingUpdateData)
resultsMutex.Lock()
results[k] = err
resultsMutex.Unlock()
}(k, bastionGroup)
}
wg.Wait()
}
// Upgrade master next
{
var wg sync.WaitGroup
// We run master nodes in series, even if they are in separate instance groups
// typically they will be in separate instance groups, so we can force the zones,
// and we don't want to roll all the masters at the same time. See issue #284
wg.Add(1)
go func() {
for k := range masterGroups {
resultsMutex.Lock()
results[k] = fmt.Errorf("function panic")
resultsMutex.Unlock()
}
defer wg.Done()
for k, group := range masterGroups {
rollingUpdateData := c.CreateRollingUpdateData(instanceGroups, false)
err := group.RollingUpdateDV(rollingUpdateData)
resultsMutex.Lock()
results[k] = err
resultsMutex.Unlock()
// TODO: Bail on error?
}
}()
wg.Wait()
}
// Upgrade nodes, with greater parallelism
// TODO increase each instancegroups nodes by one
{
var wg sync.WaitGroup
for k, nodeGroup := range nodeGroups {
wg.Add(1)
go func(k string, group *CloudInstanceGroup) {
resultsMutex.Lock()
results[k] = fmt.Errorf("function panic")
resultsMutex.Unlock()
defer wg.Done()
rollingUpdateData := c.CreateRollingUpdateData(instanceGroups, false)
err := group.RollingUpdateDV(rollingUpdateData)
resultsMutex.Lock()
results[k] = err
resultsMutex.Unlock()
}(k, nodeGroup)
}
wg.Wait()
}
for _, err := range results {
if err != nil {
return err
}
}
glog.Info("\nRolling update completed!\n")
return nil
}
func (c *RollingUpdateClusterDV) CreateRollingUpdateData(instanceGroups *api.InstanceGroupList, isBastion bool) *RollingUpdateDataDV {
return &RollingUpdateDataDV{
Cloud: c.Cloud,
Force: c.Force,
Interval: c.NodeInterval,
InstanceGroupList: instanceGroups,
IsBastion: isBastion,
K8sClient: c.K8sClient,
FailOnValidate: c.FailOnValidate,
ForceDrain: c.ForceDrain,
CloudOnly: c.CloudOnly,
ClusterName: c.ClusterName,
}
}
// RollingUpdate performs a rolling update on a list of ec2 instances.
func (n *CloudInstanceGroup) RollingUpdateDV(rollingUpdateData *RollingUpdateDataDV) error {
// we should not get here, but hey I am going to check
if rollingUpdateData == nil {
return fmt.Errorf("RollingUpdate cannot be nil")
}
// Do not need a k8s client if you are doing cloud only
if rollingUpdateData.K8sClient == nil && !rollingUpdateData.CloudOnly {
return fmt.Errorf("RollingUpdate is missing a k8s client")
}
if rollingUpdateData.InstanceGroupList == nil {
return fmt.Errorf("RollingUpdate is missing a the InstanceGroupList")
}
c := rollingUpdateData.Cloud.(awsup.AWSCloud)
update := n.NeedUpdate
if rollingUpdateData.Force {
update = append(update, n.Ready...)
}
// TODO is this logic correct
if !rollingUpdateData.IsBastion && rollingUpdateData.FailOnValidate && !rollingUpdateData.CloudOnly {
_, err := validate.ValidateCluster(rollingUpdateData.ClusterName, rollingUpdateData.InstanceGroupList, rollingUpdateData.K8sClient)
if err != nil {
return fmt.Errorf("Cluster %s does not pass validation", rollingUpdateData.ClusterName)
}
}
for _, u := range update {
if !rollingUpdateData.IsBastion {
if rollingUpdateData.CloudOnly {
glog.Warningf("not draining nodes - cloud only is set")
} else {
drain, err := NewDrainOptions(nil, u.Node.ClusterName)
if err != nil {
glog.Warningf("Error creating drain: %v", err)
if rollingUpdateData.ForceDrain == false {
return err
}
} else {
err = drain.DrainTheNode(u.Node.Name)
if err != nil {
glog.Warningf("setupErr: %v", err)
}
if rollingUpdateData.ForceDrain == false {
return err
}
}
}
}
// TODO: Temporarily increase size of ASG?
// TODO: Remove from ASG first so status is immediately updated?
// TODO: Batch termination, like a rolling-update
// TODO: check if an asg is running the correct number of instances
instanceID := aws.StringValue(u.ASGInstance.InstanceId)
glog.Infof("Stopping instance %q in AWS ASG %q", instanceID, n.ASGName)
request := &ec2.TerminateInstancesInput{
InstanceIds: []*string{u.ASGInstance.InstanceId},
}
_, err := c.EC2().TerminateInstances(request)
if err != nil {
return fmt.Errorf("error deleting instance %q: %v", instanceID, err)
}
if !rollingUpdateData.IsBastion {
// Wait for new EC2 instances to be created
time.Sleep(rollingUpdateData.Interval)
// Wait until the cluster is happy
// TODO: do we need to respect cloud only??
for i := 0; i <= retries; i++ {
if rollingUpdateData.CloudOnly {
glog.Warningf("sleeping only - not validating nodes as cloudonly flag is set")
time.Sleep(rollingUpdateData.Interval)
} else {
_, err = validate.ValidateCluster(rollingUpdateData.ClusterName, rollingUpdateData.InstanceGroupList, rollingUpdateData.K8sClient)
if err != nil {
glog.Infof("Unable to validate k8s cluster: %s.", err)
time.Sleep(rollingUpdateData.Interval / 2)
} else {
glog.Info("Cluster validated proceeding with next step in rolling update")
break
}
}
}
if rollingUpdateData.CloudOnly {
glog.Warningf("not validating nodes as cloudonly flag is set")
} else if err != nil && rollingUpdateData.FailOnValidate {
return fmt.Errorf("validation timed out while performing rolling update: %v", err)
}
}
}
return nil
}