Merge pull request #8709 from johngmyers/rolling-refactor

Refactor/simplify rolling update
This commit is contained in:
Kubernetes Prow Robot 2020-03-09 22:35:36 -07:00 committed by GitHub
commit fc844a23c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 131 deletions

View File

@ -33,44 +33,10 @@ import (
api "k8s.io/kops/pkg/apis/kops" api "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances" "k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/drain" "k8s.io/kops/pkg/drain"
"k8s.io/kops/upup/pkg/fi"
) )
const rollingUpdateTaintKey = "kops.k8s.io/scheduled-for-update" const rollingUpdateTaintKey = "kops.k8s.io/scheduled-for-update"
// RollingUpdateInstanceGroup is the AWS ASG backing an InstanceGroup.
type RollingUpdateInstanceGroup struct {
// Cloud is the kops cloud provider
Cloud fi.Cloud
// CloudGroup is the kops cloud provider groups
CloudGroup *cloudinstances.CloudInstanceGroup
// TODO should remove the need to have rollingupdate struct and add:
// TODO - the kubernetes client
// TODO - the cluster name
// TODO - the client config
// TODO - fail on validate
// TODO - fail on drain
// TODO - cloudonly
}
// NewRollingUpdateInstanceGroup creates a new struct
func NewRollingUpdateInstanceGroup(cloud fi.Cloud, cloudGroup *cloudinstances.CloudInstanceGroup) (*RollingUpdateInstanceGroup, error) {
if cloud == nil {
return nil, fmt.Errorf("cloud provider is required")
}
if cloudGroup == nil {
return nil, fmt.Errorf("cloud group is required")
}
// TODO check more values in cloudGroup that they are set properly
return &RollingUpdateInstanceGroup{
Cloud: cloud,
CloudGroup: cloudGroup,
}, nil
}
// promptInteractive asks the user to continue, mostly copied from vendor/google.golang.org/api/examples/gmail.go. // promptInteractive asks the user to continue, mostly copied from vendor/google.golang.org/api/examples/gmail.go.
func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting bool, err error) { func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting bool, err error) {
stopPrompting = false stopPrompting = false
@ -102,23 +68,17 @@ func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting b
} }
// RollingUpdate performs a rolling update on a list of instances. // RollingUpdate performs a rolling update on a list of instances.
func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpdateCluster, cluster *api.Cluster, isBastion bool, sleepAfterTerminate time.Duration, validationTimeout time.Duration) (err error) { func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(cluster *api.Cluster, group *cloudinstances.CloudInstanceGroup, isBastion bool, sleepAfterTerminate time.Duration, validationTimeout time.Duration) (err error) {
// we should not get here, but hey I am going to check.
if rollingUpdateData == nil {
return fmt.Errorf("rollingUpdate cannot be nil")
}
// Do not need a k8s client if you are doing cloudonly. // Do not need a k8s client if you are doing cloudonly.
if rollingUpdateData.K8sClient == nil && !rollingUpdateData.CloudOnly { if c.K8sClient == nil && !c.CloudOnly {
return fmt.Errorf("rollingUpdate is missing a k8s client") return fmt.Errorf("rollingUpdate is missing a k8s client")
} }
noneReady := len(r.CloudGroup.Ready) == 0 noneReady := len(group.Ready) == 0
numInstances := len(r.CloudGroup.Ready) + len(r.CloudGroup.NeedUpdate) numInstances := len(group.Ready) + len(group.NeedUpdate)
update := r.CloudGroup.NeedUpdate update := group.NeedUpdate
if rollingUpdateData.Force { if c.Force {
update = append(update, r.CloudGroup.Ready...) update = append(update, group.Ready...)
} }
if len(update) == 0 { if len(update) == 0 {
@ -127,11 +87,11 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
if isBastion { if isBastion {
klog.V(3).Info("Not validating the cluster as instance is a bastion.") klog.V(3).Info("Not validating the cluster as instance is a bastion.")
} else if rollingUpdateData.CloudOnly { } else if c.CloudOnly {
klog.V(3).Info("Not validating cluster as validation is turned off via the cloud-only flag.") klog.V(3).Info("Not validating cluster as validation is turned off via the cloud-only flag.")
} else { } else {
if err = r.validateCluster(rollingUpdateData, cluster); err != nil { if err = c.validateCluster(); err != nil {
if rollingUpdateData.FailOnValidate { if c.FailOnValidate {
return err return err
} }
klog.V(2).Infof("Ignoring cluster validation error: %v", err) klog.V(2).Infof("Ignoring cluster validation error: %v", err)
@ -139,14 +99,14 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
} }
} }
if !rollingUpdateData.CloudOnly { if !c.CloudOnly {
err = r.taintAllNeedUpdate(update, rollingUpdateData) err = c.taintAllNeedUpdate(group, update)
if err != nil { if err != nil {
return err return err
} }
} }
settings := resolveSettings(cluster, r.CloudGroup.InstanceGroup, numInstances) settings := resolveSettings(cluster, group.InstanceGroup, numInstances)
runningDrains := 0 runningDrains := 0
maxSurge := settings.MaxSurge.IntValue() maxSurge := settings.MaxSurge.IntValue()
@ -156,11 +116,11 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
maxConcurrency := maxSurge + settings.MaxUnavailable.IntValue() maxConcurrency := maxSurge + settings.MaxUnavailable.IntValue()
if maxConcurrency == 0 { if maxConcurrency == 0 {
klog.Infof("Rolling updates for InstanceGroup %s are disabled", r.CloudGroup.InstanceGroup.Name) klog.Infof("Rolling updates for InstanceGroup %s are disabled", group.InstanceGroup.Name)
return nil return nil
} }
if r.CloudGroup.InstanceGroup.Spec.Role == api.InstanceGroupRoleMaster && maxSurge != 0 { if group.InstanceGroup.Spec.Role == api.InstanceGroupRoleMaster && maxSurge != 0 {
// Masters are incapable of surging because they rely on registering themselves through // Masters are incapable of surging because they rely on registering themselves through
// the local apiserver. That apiserver depends on the local etcd, which relies on being // the local apiserver. That apiserver depends on the local etcd, which relies on being
// joined to the etcd cluster. // joined to the etcd cluster.
@ -171,7 +131,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
} }
} }
if rollingUpdateData.Interactive { if c.Interactive {
if maxSurge > 1 { if maxSurge > 1 {
maxSurge = 1 maxSurge = 1
} }
@ -180,11 +140,11 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
update = prioritizeUpdate(update) update = prioritizeUpdate(update)
if maxSurge > 0 && !rollingUpdateData.CloudOnly { if maxSurge > 0 && !c.CloudOnly {
for numSurge := 1; numSurge <= maxSurge; numSurge++ { for numSurge := 1; numSurge <= maxSurge; numSurge++ {
u := update[len(update)-numSurge] u := update[len(update)-numSurge]
if !u.Detached { if !u.Detached {
if err := r.detachInstance(u); err != nil { if err := c.detachInstance(u); err != nil {
return err return err
} }
@ -195,7 +155,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate) klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate)
time.Sleep(sleepAfterTerminate) time.Sleep(sleepAfterTerminate)
if err := r.maybeValidate(rollingUpdateData, validationTimeout, "detaching"); err != nil { if err := c.maybeValidate(validationTimeout, "detaching"); err != nil {
return err return err
} }
noneReady = false noneReady = false
@ -208,7 +168,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
for uIdx, u := range update { for uIdx, u := range update {
go func(m *cloudinstances.CloudInstanceGroupMember) { go func(m *cloudinstances.CloudInstanceGroupMember) {
terminateChan <- r.drainTerminateAndWait(m, rollingUpdateData, isBastion, sleepAfterTerminate) terminateChan <- c.drainTerminateAndWait(m, isBastion, sleepAfterTerminate)
}(u) }(u)
runningDrains++ runningDrains++
@ -224,12 +184,12 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
} }
err = r.maybeValidate(rollingUpdateData, validationTimeout, "removing") err = c.maybeValidate(validationTimeout, "removing")
if err != nil { if err != nil {
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
} }
if rollingUpdateData.Interactive { if c.Interactive {
nodeName := "" nodeName := ""
if u.Node != nil { if u.Node != nil {
nodeName = u.Node.Name nodeName = u.Node.Name
@ -241,7 +201,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
} }
if stopPrompting { if stopPrompting {
// Is a pointer to a struct, changes here push back into the original // Is a pointer to a struct, changes here push back into the original
rollingUpdateData.Interactive = false c.Interactive = false
} }
} }
@ -270,7 +230,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
} }
} }
err = r.maybeValidate(rollingUpdateData, validationTimeout, "removing") err = c.maybeValidate(validationTimeout, "removing")
if err != nil { if err != nil {
return err return err
} }
@ -306,7 +266,7 @@ func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan er
return err return err
} }
func (r *RollingUpdateInstanceGroup) taintAllNeedUpdate(update []*cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster) error { func (c *RollingUpdateCluster) taintAllNeedUpdate(group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstanceGroupMember) error {
var toTaint []*corev1.Node var toTaint []*corev1.Node
for _, u := range update { for _, u := range update {
if u.Node != nil && !u.Node.Spec.Unschedulable { if u.Node != nil && !u.Node.Spec.Unschedulable {
@ -326,10 +286,10 @@ func (r *RollingUpdateInstanceGroup) taintAllNeedUpdate(update []*cloudinstances
if len(toTaint) == 1 { if len(toTaint) == 1 {
noun = "node" noun = "node"
} }
klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, r.CloudGroup.InstanceGroup.Name) klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, group.InstanceGroup.Name)
for _, n := range toTaint { for _, n := range toTaint {
if err := r.patchTaint(rollingUpdateData, n); err != nil { if err := c.patchTaint(n); err != nil {
if rollingUpdateData.FailOnDrainError { if c.FailOnDrainError {
return fmt.Errorf("failed to taint node %q: %v", n, err) return fmt.Errorf("failed to taint node %q: %v", n, err)
} }
klog.Infof("Ignoring error tainting node %q: %v", n, err) klog.Infof("Ignoring error tainting node %q: %v", n, err)
@ -339,7 +299,7 @@ func (r *RollingUpdateInstanceGroup) taintAllNeedUpdate(update []*cloudinstances
return nil return nil
} }
func (r *RollingUpdateInstanceGroup) patchTaint(rollingUpdateData *RollingUpdateCluster, node *corev1.Node) error { func (c *RollingUpdateCluster) patchTaint(node *corev1.Node) error {
oldData, err := json.Marshal(node) oldData, err := json.Marshal(node)
if err != nil { if err != nil {
return err return err
@ -360,11 +320,11 @@ func (r *RollingUpdateInstanceGroup) patchTaint(rollingUpdateData *RollingUpdate
return err return err
} }
_, err = rollingUpdateData.K8sClient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes) _, err = c.K8sClient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes)
return err return err
} }
func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster, isBastion bool, sleepAfterTerminate time.Duration) error { func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, isBastion bool, sleepAfterTerminate time.Duration) error {
instanceId := u.ID instanceId := u.ID
nodeName := "" nodeName := ""
@ -374,7 +334,7 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
if isBastion { if isBastion {
// We don't want to validate for bastions - they aren't part of the cluster // We don't want to validate for bastions - they aren't part of the cluster
} else if rollingUpdateData.CloudOnly { } else if c.CloudOnly {
klog.Warning("Not draining cluster nodes as 'cloudonly' flag is set.") klog.Warning("Not draining cluster nodes as 'cloudonly' flag is set.")
@ -383,8 +343,8 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
if u.Node != nil { if u.Node != nil {
klog.Infof("Draining the node: %q.", nodeName) klog.Infof("Draining the node: %q.", nodeName)
if err := r.DrainNode(u, rollingUpdateData); err != nil { if err := c.drainNode(u); err != nil {
if rollingUpdateData.FailOnDrainError { if c.FailOnDrainError {
return fmt.Errorf("failed to drain node %q: %v", nodeName, err) return fmt.Errorf("failed to drain node %q: %v", nodeName, err)
} }
klog.Infof("Ignoring error draining node %q: %v", nodeName, err) klog.Infof("Ignoring error draining node %q: %v", nodeName, err)
@ -396,18 +356,18 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
// We unregister the node before deleting it; if the replacement comes up with the same name it would otherwise still be cordoned // We unregister the node before deleting it; if the replacement comes up with the same name it would otherwise still be cordoned
// (It often seems like GCE tries to re-use names) // (It often seems like GCE tries to re-use names)
if !isBastion && !rollingUpdateData.CloudOnly { if !isBastion && !c.CloudOnly {
if u.Node == nil { if u.Node == nil {
klog.Warningf("no kubernetes Node associated with %s, skipping node deletion", instanceId) klog.Warningf("no kubernetes Node associated with %s, skipping node deletion", instanceId)
} else { } else {
klog.Infof("deleting node %q from kubernetes", nodeName) klog.Infof("deleting node %q from kubernetes", nodeName)
if err := r.deleteNode(u.Node, rollingUpdateData); err != nil { if err := c.deleteNode(u.Node); err != nil {
return fmt.Errorf("error deleting node %q: %v", nodeName, err) return fmt.Errorf("error deleting node %q: %v", nodeName, err)
} }
} }
} }
if err := r.DeleteInstance(u); err != nil { if err := c.deleteInstance(u); err != nil {
klog.Errorf("error deleting instance %q, node %q: %v", instanceId, nodeName, err) klog.Errorf("error deleting instance %q, node %q: %v", instanceId, nodeName, err)
return err return err
} }
@ -419,16 +379,16 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
return nil return nil
} }
func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpdateCluster, validationTimeout time.Duration, operation string) error { func (c *RollingUpdateCluster) maybeValidate(validationTimeout time.Duration, operation string) error {
if rollingUpdateData.CloudOnly { if c.CloudOnly {
klog.Warningf("Not validating cluster as cloudonly flag is set.") klog.Warningf("Not validating cluster as cloudonly flag is set.")
} else { } else {
klog.Info("Validating the cluster.") klog.Info("Validating the cluster.")
if err := r.validateClusterWithDuration(rollingUpdateData, validationTimeout); err != nil { if err := c.validateClusterWithDuration(validationTimeout); err != nil {
if rollingUpdateData.FailOnValidate { if c.FailOnValidate {
klog.Errorf("Cluster did not validate within %s", validationTimeout) klog.Errorf("Cluster did not validate within %s", validationTimeout)
return fmt.Errorf("error validating cluster after %s a node: %v", operation, err) return fmt.Errorf("error validating cluster after %s a node: %v", operation, err)
} }
@ -440,15 +400,15 @@ func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpd
} }
// validateClusterWithDuration runs validation.ValidateCluster until either we get positive result or the timeout expires // validateClusterWithDuration runs validation.ValidateCluster until either we get positive result or the timeout expires
func (r *RollingUpdateInstanceGroup) validateClusterWithDuration(rollingUpdateData *RollingUpdateCluster, duration time.Duration) error { func (c *RollingUpdateCluster) validateClusterWithDuration(duration time.Duration) error {
// Try to validate cluster at least once, this will handle durations that are lower // Try to validate cluster at least once, this will handle durations that are lower
// than our tick time // than our tick time
if r.tryValidateCluster(rollingUpdateData, duration, rollingUpdateData.ValidateTickDuration) { if c.tryValidateCluster(duration) {
return nil return nil
} }
timeout := time.After(duration) timeout := time.After(duration)
ticker := time.NewTicker(rollingUpdateData.ValidateTickDuration) ticker := time.NewTicker(c.ValidateTickDuration)
defer ticker.Stop() defer ticker.Stop()
// Keep trying until we're timed out or got a result or got an error // Keep trying until we're timed out or got a result or got an error
for { for {
@ -458,7 +418,7 @@ func (r *RollingUpdateInstanceGroup) validateClusterWithDuration(rollingUpdateDa
return fmt.Errorf("cluster did not validate within a duration of %q", duration) return fmt.Errorf("cluster did not validate within a duration of %q", duration)
case <-ticker.C: case <-ticker.C:
// Got a tick, validate cluster // Got a tick, validate cluster
if r.tryValidateCluster(rollingUpdateData, duration, rollingUpdateData.ValidateTickDuration) { if c.tryValidateCluster(duration) {
return nil return nil
} }
// ValidateCluster didn't work yet, so let's try again // ValidateCluster didn't work yet, so let's try again
@ -467,24 +427,24 @@ func (r *RollingUpdateInstanceGroup) validateClusterWithDuration(rollingUpdateDa
} }
} }
func (r *RollingUpdateInstanceGroup) tryValidateCluster(rollingUpdateData *RollingUpdateCluster, duration time.Duration, tickDuration time.Duration) bool { func (c *RollingUpdateCluster) tryValidateCluster(duration time.Duration) bool {
result, err := rollingUpdateData.ClusterValidator.Validate() result, err := c.ClusterValidator.Validate()
if err == nil && len(result.Failures) == 0 && rollingUpdateData.ValidateSuccessDuration > 0 { if err == nil && len(result.Failures) == 0 && c.ValidateSuccessDuration > 0 {
klog.Infof("Cluster validated; revalidating in %s to make sure it does not flap.", rollingUpdateData.ValidateSuccessDuration) klog.Infof("Cluster validated; revalidating in %s to make sure it does not flap.", c.ValidateSuccessDuration)
time.Sleep(rollingUpdateData.ValidateSuccessDuration) time.Sleep(c.ValidateSuccessDuration)
result, err = rollingUpdateData.ClusterValidator.Validate() result, err = c.ClusterValidator.Validate()
} }
if err != nil { if err != nil {
klog.Infof("Cluster did not validate, will try again in %q until duration %q expires: %v.", tickDuration, duration, err) klog.Infof("Cluster did not validate, will try again in %q until duration %q expires: %v.", c.ValidateTickDuration, duration, err)
return false return false
} else if len(result.Failures) > 0 { } else if len(result.Failures) > 0 {
messages := []string{} messages := []string{}
for _, failure := range result.Failures { for _, failure := range result.Failures {
messages = append(messages, failure.Message) messages = append(messages, failure.Message)
} }
klog.Infof("Cluster did not pass validation, will try again in %q until duration %q expires: %s.", tickDuration, duration, strings.Join(messages, ", ")) klog.Infof("Cluster did not pass validation, will try again in %q until duration %q expires: %s.", c.ValidateTickDuration, duration, strings.Join(messages, ", "))
return false return false
} else { } else {
klog.Info("Cluster validated.") klog.Info("Cluster validated.")
@ -493,37 +453,36 @@ func (r *RollingUpdateInstanceGroup) tryValidateCluster(rollingUpdateData *Rolli
} }
// validateCluster runs our validation methods on the K8s Cluster. // validateCluster runs our validation methods on the K8s Cluster.
func (r *RollingUpdateInstanceGroup) validateCluster(rollingUpdateData *RollingUpdateCluster, cluster *api.Cluster) error { func (c *RollingUpdateCluster) validateCluster() error {
result, err := rollingUpdateData.ClusterValidator.Validate() result, err := c.ClusterValidator.Validate()
if err != nil { if err != nil {
return fmt.Errorf("cluster %q did not validate: %v", cluster.Name, err) return fmt.Errorf("cluster %q did not validate: %v", c.ClusterName, err)
} }
if len(result.Failures) > 0 { if len(result.Failures) > 0 {
messages := []string{} messages := []string{}
for _, failure := range result.Failures { for _, failure := range result.Failures {
messages = append(messages, failure.Message) messages = append(messages, failure.Message)
} }
return fmt.Errorf("cluster %q did not pass validation: %s", cluster.Name, strings.Join(messages, ", ")) return fmt.Errorf("cluster %q did not pass validation: %s", c.ClusterName, strings.Join(messages, ", "))
} }
return nil return nil
} }
// detachInstance detaches a Cloud Instance // detachInstance detaches a Cloud Instance
func (r *RollingUpdateInstanceGroup) detachInstance(u *cloudinstances.CloudInstanceGroupMember) error { func (c *RollingUpdateCluster) detachInstance(u *cloudinstances.CloudInstanceGroupMember) error {
id := u.ID id := u.ID
nodeName := "" nodeName := ""
if u.Node != nil { if u.Node != nil {
nodeName = u.Node.Name nodeName = u.Node.Name
} }
if nodeName != "" { if nodeName != "" {
klog.Infof("Detaching instance %q, node %q, in group %q.", id, nodeName, r.CloudGroup.HumanName) klog.Infof("Detaching instance %q, node %q, in group %q.", id, nodeName, u.CloudInstanceGroup.HumanName)
} else { } else {
klog.Infof("Detaching instance %q, in group %q.", id, r.CloudGroup.HumanName) klog.Infof("Detaching instance %q, in group %q.", id, u.CloudInstanceGroup.HumanName)
} }
if err := r.Cloud.DetachInstance(u); err != nil { if err := c.Cloud.DetachInstance(u); err != nil {
if nodeName != "" { if nodeName != "" {
return fmt.Errorf("error detaching instance %q, node %q: %v", id, nodeName, err) return fmt.Errorf("error detaching instance %q, node %q: %v", id, nodeName, err)
} else { } else {
@ -534,20 +493,20 @@ func (r *RollingUpdateInstanceGroup) detachInstance(u *cloudinstances.CloudInsta
return nil return nil
} }
// DeleteInstance deletes an Cloud Instance. // deleteInstance deletes an Cloud Instance.
func (r *RollingUpdateInstanceGroup) DeleteInstance(u *cloudinstances.CloudInstanceGroupMember) error { func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstanceGroupMember) error {
id := u.ID id := u.ID
nodeName := "" nodeName := ""
if u.Node != nil { if u.Node != nil {
nodeName = u.Node.Name nodeName = u.Node.Name
} }
if nodeName != "" { if nodeName != "" {
klog.Infof("Stopping instance %q, node %q, in group %q (this may take a while).", id, nodeName, r.CloudGroup.HumanName) klog.Infof("Stopping instance %q, node %q, in group %q (this may take a while).", id, nodeName, u.CloudInstanceGroup.HumanName)
} else { } else {
klog.Infof("Stopping instance %q, in group %q (this may take a while).", id, r.CloudGroup.HumanName) klog.Infof("Stopping instance %q, in group %q (this may take a while).", id, u.CloudInstanceGroup.HumanName)
} }
if err := r.Cloud.DeleteInstance(u); err != nil { if err := c.Cloud.DeleteInstance(u); err != nil {
if nodeName != "" { if nodeName != "" {
return fmt.Errorf("error deleting instance %q, node %q: %v", id, nodeName, err) return fmt.Errorf("error deleting instance %q, node %q: %v", id, nodeName, err)
} else { } else {
@ -556,12 +515,11 @@ func (r *RollingUpdateInstanceGroup) DeleteInstance(u *cloudinstances.CloudInsta
} }
return nil return nil
} }
// DrainNode drains a K8s node. // drainNode drains a K8s node.
func (r *RollingUpdateInstanceGroup) DrainNode(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster) error { func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstanceGroupMember) error {
if rollingUpdateData.K8sClient == nil { if c.K8sClient == nil {
return fmt.Errorf("K8sClient not set") return fmt.Errorf("K8sClient not set")
} }
@ -574,7 +532,7 @@ func (r *RollingUpdateInstanceGroup) DrainNode(u *cloudinstances.CloudInstanceGr
} }
helper := &drain.Helper{ helper := &drain.Helper{
Client: rollingUpdateData.K8sClient, Client: c.K8sClient,
Force: true, Force: true,
GracePeriodSeconds: -1, GracePeriodSeconds: -1,
IgnoreAllDaemonSets: true, IgnoreAllDaemonSets: true,
@ -596,19 +554,18 @@ func (r *RollingUpdateInstanceGroup) DrainNode(u *cloudinstances.CloudInstanceGr
return fmt.Errorf("error draining node: %v", err) return fmt.Errorf("error draining node: %v", err)
} }
if rollingUpdateData.PostDrainDelay > 0 { if c.PostDrainDelay > 0 {
klog.Infof("Waiting for %s for pods to stabilize after draining.", rollingUpdateData.PostDrainDelay) klog.Infof("Waiting for %s for pods to stabilize after draining.", c.PostDrainDelay)
time.Sleep(rollingUpdateData.PostDrainDelay) time.Sleep(c.PostDrainDelay)
} }
return nil return nil
} }
// DeleteNode deletes a node from the k8s API. It does not delete the underlying instance. // deleteNode deletes a node from the k8s API. It does not delete the underlying instance.
func (r *RollingUpdateInstanceGroup) deleteNode(node *corev1.Node, rollingUpdateData *RollingUpdateCluster) error { func (c *RollingUpdateCluster) deleteNode(node *corev1.Node) error {
k8sclient := rollingUpdateData.K8sClient
var options metav1.DeleteOptions var options metav1.DeleteOptions
err := k8sclient.CoreV1().Nodes().Delete(node.Name, &options) err := c.K8sClient.CoreV1().Nodes().Delete(node.Name, &options)
if err != nil { if err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
return nil return nil

View File

@ -108,10 +108,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C
defer wg.Done() defer wg.Done()
g, err := NewRollingUpdateInstanceGroup(c.Cloud, group) err := c.rollingUpdateInstanceGroup(cluster, group, true, c.BastionInterval, c.ValidationTimeout)
if err == nil {
err = g.RollingUpdate(c, cluster, true, c.BastionInterval, c.ValidationTimeout)
}
resultsMutex.Lock() resultsMutex.Lock()
results[k] = err results[k] = err
@ -136,10 +133,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C
// and we don't want to roll all the masters at the same time. See issue #284 // and we don't want to roll all the masters at the same time. See issue #284
for _, group := range masterGroups { for _, group := range masterGroups {
g, err := NewRollingUpdateInstanceGroup(c.Cloud, group) err := c.rollingUpdateInstanceGroup(cluster, group, false, c.MasterInterval, c.ValidationTimeout)
if err == nil {
err = g.RollingUpdate(c, cluster, false, c.MasterInterval, c.ValidationTimeout)
}
// Do not continue update if master(s) failed, cluster is potentially in an unhealthy state // Do not continue update if master(s) failed, cluster is potentially in an unhealthy state
if err != nil { if err != nil {
@ -161,10 +155,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C
} }
for k, group := range nodeGroups { for k, group := range nodeGroups {
g, err := NewRollingUpdateInstanceGroup(c.Cloud, group) err := c.rollingUpdateInstanceGroup(cluster, group, false, c.NodeInterval, c.ValidationTimeout)
if err == nil {
err = g.RollingUpdate(c, cluster, false, c.NodeInterval, c.ValidationTimeout)
}
results[k] = err results[k] = err