mirror of https://github.com/kubernetes/kops.git
Refactor/simplify rolling update
This commit is contained in:
parent
faf57d67b3
commit
03eb8246c7
|
@ -33,44 +33,10 @@ import (
|
|||
api "k8s.io/kops/pkg/apis/kops"
|
||||
"k8s.io/kops/pkg/cloudinstances"
|
||||
"k8s.io/kops/pkg/drain"
|
||||
"k8s.io/kops/upup/pkg/fi"
|
||||
)
|
||||
|
||||
const rollingUpdateTaintKey = "kops.k8s.io/scheduled-for-update"
|
||||
|
||||
// RollingUpdateInstanceGroup is the AWS ASG backing an InstanceGroup.
|
||||
type RollingUpdateInstanceGroup struct {
|
||||
// Cloud is the kops cloud provider
|
||||
Cloud fi.Cloud
|
||||
// CloudGroup is the kops cloud provider groups
|
||||
CloudGroup *cloudinstances.CloudInstanceGroup
|
||||
|
||||
// TODO should remove the need to have rollingupdate struct and add:
|
||||
// TODO - the kubernetes client
|
||||
// TODO - the cluster name
|
||||
// TODO - the client config
|
||||
// TODO - fail on validate
|
||||
// TODO - fail on drain
|
||||
// TODO - cloudonly
|
||||
}
|
||||
|
||||
// NewRollingUpdateInstanceGroup creates a new struct
|
||||
func NewRollingUpdateInstanceGroup(cloud fi.Cloud, cloudGroup *cloudinstances.CloudInstanceGroup) (*RollingUpdateInstanceGroup, error) {
|
||||
if cloud == nil {
|
||||
return nil, fmt.Errorf("cloud provider is required")
|
||||
}
|
||||
if cloudGroup == nil {
|
||||
return nil, fmt.Errorf("cloud group is required")
|
||||
}
|
||||
|
||||
// TODO check more values in cloudGroup that they are set properly
|
||||
|
||||
return &RollingUpdateInstanceGroup{
|
||||
Cloud: cloud,
|
||||
CloudGroup: cloudGroup,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// promptInteractive asks the user to continue, mostly copied from vendor/google.golang.org/api/examples/gmail.go.
|
||||
func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting bool, err error) {
|
||||
stopPrompting = false
|
||||
|
@ -102,23 +68,17 @@ func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting b
|
|||
}
|
||||
|
||||
// RollingUpdate performs a rolling update on a list of instances.
|
||||
func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpdateCluster, cluster *api.Cluster, isBastion bool, sleepAfterTerminate time.Duration, validationTimeout time.Duration) (err error) {
|
||||
|
||||
// we should not get here, but hey I am going to check.
|
||||
if rollingUpdateData == nil {
|
||||
return fmt.Errorf("rollingUpdate cannot be nil")
|
||||
}
|
||||
|
||||
func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(cluster *api.Cluster, group *cloudinstances.CloudInstanceGroup, isBastion bool, sleepAfterTerminate time.Duration, validationTimeout time.Duration) (err error) {
|
||||
// Do not need a k8s client if you are doing cloudonly.
|
||||
if rollingUpdateData.K8sClient == nil && !rollingUpdateData.CloudOnly {
|
||||
if c.K8sClient == nil && !c.CloudOnly {
|
||||
return fmt.Errorf("rollingUpdate is missing a k8s client")
|
||||
}
|
||||
|
||||
noneReady := len(r.CloudGroup.Ready) == 0
|
||||
numInstances := len(r.CloudGroup.Ready) + len(r.CloudGroup.NeedUpdate)
|
||||
update := r.CloudGroup.NeedUpdate
|
||||
if rollingUpdateData.Force {
|
||||
update = append(update, r.CloudGroup.Ready...)
|
||||
noneReady := len(group.Ready) == 0
|
||||
numInstances := len(group.Ready) + len(group.NeedUpdate)
|
||||
update := group.NeedUpdate
|
||||
if c.Force {
|
||||
update = append(update, group.Ready...)
|
||||
}
|
||||
|
||||
if len(update) == 0 {
|
||||
|
@ -127,11 +87,11 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
|
||||
if isBastion {
|
||||
klog.V(3).Info("Not validating the cluster as instance is a bastion.")
|
||||
} else if rollingUpdateData.CloudOnly {
|
||||
} else if c.CloudOnly {
|
||||
klog.V(3).Info("Not validating cluster as validation is turned off via the cloud-only flag.")
|
||||
} else {
|
||||
if err = r.validateCluster(rollingUpdateData, cluster); err != nil {
|
||||
if rollingUpdateData.FailOnValidate {
|
||||
if err = c.validateCluster(); err != nil {
|
||||
if c.FailOnValidate {
|
||||
return err
|
||||
}
|
||||
klog.V(2).Infof("Ignoring cluster validation error: %v", err)
|
||||
|
@ -139,14 +99,14 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
}
|
||||
}
|
||||
|
||||
if !rollingUpdateData.CloudOnly {
|
||||
err = r.taintAllNeedUpdate(update, rollingUpdateData)
|
||||
if !c.CloudOnly {
|
||||
err = c.taintAllNeedUpdate(group, update)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
settings := resolveSettings(cluster, r.CloudGroup.InstanceGroup, numInstances)
|
||||
settings := resolveSettings(cluster, group.InstanceGroup, numInstances)
|
||||
|
||||
runningDrains := 0
|
||||
maxSurge := settings.MaxSurge.IntValue()
|
||||
|
@ -156,11 +116,11 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
maxConcurrency := maxSurge + settings.MaxUnavailable.IntValue()
|
||||
|
||||
if maxConcurrency == 0 {
|
||||
klog.Infof("Rolling updates for InstanceGroup %s are disabled", r.CloudGroup.InstanceGroup.Name)
|
||||
klog.Infof("Rolling updates for InstanceGroup %s are disabled", group.InstanceGroup.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
if r.CloudGroup.InstanceGroup.Spec.Role == api.InstanceGroupRoleMaster && maxSurge != 0 {
|
||||
if group.InstanceGroup.Spec.Role == api.InstanceGroupRoleMaster && maxSurge != 0 {
|
||||
// Masters are incapable of surging because they rely on registering themselves through
|
||||
// the local apiserver. That apiserver depends on the local etcd, which relies on being
|
||||
// joined to the etcd cluster.
|
||||
|
@ -171,7 +131,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
}
|
||||
}
|
||||
|
||||
if rollingUpdateData.Interactive {
|
||||
if c.Interactive {
|
||||
if maxSurge > 1 {
|
||||
maxSurge = 1
|
||||
}
|
||||
|
@ -180,11 +140,11 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
|
||||
update = prioritizeUpdate(update)
|
||||
|
||||
if maxSurge > 0 && !rollingUpdateData.CloudOnly {
|
||||
if maxSurge > 0 && !c.CloudOnly {
|
||||
for numSurge := 1; numSurge <= maxSurge; numSurge++ {
|
||||
u := update[len(update)-numSurge]
|
||||
if !u.Detached {
|
||||
if err := r.detachInstance(u); err != nil {
|
||||
if err := c.detachInstance(u); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -195,7 +155,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate)
|
||||
time.Sleep(sleepAfterTerminate)
|
||||
|
||||
if err := r.maybeValidate(rollingUpdateData, validationTimeout, "detaching"); err != nil {
|
||||
if err := c.maybeValidate(validationTimeout, "detaching"); err != nil {
|
||||
return err
|
||||
}
|
||||
noneReady = false
|
||||
|
@ -208,7 +168,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
|
||||
for uIdx, u := range update {
|
||||
go func(m *cloudinstances.CloudInstanceGroupMember) {
|
||||
terminateChan <- r.drainTerminateAndWait(m, rollingUpdateData, isBastion, sleepAfterTerminate)
|
||||
terminateChan <- c.drainTerminateAndWait(m, isBastion, sleepAfterTerminate)
|
||||
}(u)
|
||||
runningDrains++
|
||||
|
||||
|
@ -224,12 +184,12 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
|
||||
}
|
||||
|
||||
err = r.maybeValidate(rollingUpdateData, validationTimeout, "removing")
|
||||
err = c.maybeValidate(validationTimeout, "removing")
|
||||
if err != nil {
|
||||
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
|
||||
}
|
||||
|
||||
if rollingUpdateData.Interactive {
|
||||
if c.Interactive {
|
||||
nodeName := ""
|
||||
if u.Node != nil {
|
||||
nodeName = u.Node.Name
|
||||
|
@ -241,7 +201,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
}
|
||||
if stopPrompting {
|
||||
// Is a pointer to a struct, changes here push back into the original
|
||||
rollingUpdateData.Interactive = false
|
||||
c.Interactive = false
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -270,7 +230,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
|
|||
}
|
||||
}
|
||||
|
||||
err = r.maybeValidate(rollingUpdateData, validationTimeout, "removing")
|
||||
err = c.maybeValidate(validationTimeout, "removing")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -306,7 +266,7 @@ func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan er
|
|||
return err
|
||||
}
|
||||
|
||||
func (r *RollingUpdateInstanceGroup) taintAllNeedUpdate(update []*cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster) error {
|
||||
func (c *RollingUpdateCluster) taintAllNeedUpdate(group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstanceGroupMember) error {
|
||||
var toTaint []*corev1.Node
|
||||
for _, u := range update {
|
||||
if u.Node != nil && !u.Node.Spec.Unschedulable {
|
||||
|
@ -326,10 +286,10 @@ func (r *RollingUpdateInstanceGroup) taintAllNeedUpdate(update []*cloudinstances
|
|||
if len(toTaint) == 1 {
|
||||
noun = "node"
|
||||
}
|
||||
klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, r.CloudGroup.InstanceGroup.Name)
|
||||
klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, group.InstanceGroup.Name)
|
||||
for _, n := range toTaint {
|
||||
if err := r.patchTaint(rollingUpdateData, n); err != nil {
|
||||
if rollingUpdateData.FailOnDrainError {
|
||||
if err := c.patchTaint(n); err != nil {
|
||||
if c.FailOnDrainError {
|
||||
return fmt.Errorf("failed to taint node %q: %v", n, err)
|
||||
}
|
||||
klog.Infof("Ignoring error tainting node %q: %v", n, err)
|
||||
|
@ -339,7 +299,7 @@ func (r *RollingUpdateInstanceGroup) taintAllNeedUpdate(update []*cloudinstances
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *RollingUpdateInstanceGroup) patchTaint(rollingUpdateData *RollingUpdateCluster, node *corev1.Node) error {
|
||||
func (c *RollingUpdateCluster) patchTaint(node *corev1.Node) error {
|
||||
oldData, err := json.Marshal(node)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -360,11 +320,11 @@ func (r *RollingUpdateInstanceGroup) patchTaint(rollingUpdateData *RollingUpdate
|
|||
return err
|
||||
}
|
||||
|
||||
_, err = rollingUpdateData.K8sClient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes)
|
||||
_, err = c.K8sClient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster, isBastion bool, sleepAfterTerminate time.Duration) error {
|
||||
func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, isBastion bool, sleepAfterTerminate time.Duration) error {
|
||||
instanceId := u.ID
|
||||
|
||||
nodeName := ""
|
||||
|
@ -374,7 +334,7 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
|
|||
|
||||
if isBastion {
|
||||
// We don't want to validate for bastions - they aren't part of the cluster
|
||||
} else if rollingUpdateData.CloudOnly {
|
||||
} else if c.CloudOnly {
|
||||
|
||||
klog.Warning("Not draining cluster nodes as 'cloudonly' flag is set.")
|
||||
|
||||
|
@ -383,8 +343,8 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
|
|||
if u.Node != nil {
|
||||
klog.Infof("Draining the node: %q.", nodeName)
|
||||
|
||||
if err := r.DrainNode(u, rollingUpdateData); err != nil {
|
||||
if rollingUpdateData.FailOnDrainError {
|
||||
if err := c.drainNode(u); err != nil {
|
||||
if c.FailOnDrainError {
|
||||
return fmt.Errorf("failed to drain node %q: %v", nodeName, err)
|
||||
}
|
||||
klog.Infof("Ignoring error draining node %q: %v", nodeName, err)
|
||||
|
@ -396,18 +356,18 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
|
|||
|
||||
// We unregister the node before deleting it; if the replacement comes up with the same name it would otherwise still be cordoned
|
||||
// (It often seems like GCE tries to re-use names)
|
||||
if !isBastion && !rollingUpdateData.CloudOnly {
|
||||
if !isBastion && !c.CloudOnly {
|
||||
if u.Node == nil {
|
||||
klog.Warningf("no kubernetes Node associated with %s, skipping node deletion", instanceId)
|
||||
} else {
|
||||
klog.Infof("deleting node %q from kubernetes", nodeName)
|
||||
if err := r.deleteNode(u.Node, rollingUpdateData); err != nil {
|
||||
if err := c.deleteNode(u.Node); err != nil {
|
||||
return fmt.Errorf("error deleting node %q: %v", nodeName, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := r.DeleteInstance(u); err != nil {
|
||||
if err := c.deleteInstance(u); err != nil {
|
||||
klog.Errorf("error deleting instance %q, node %q: %v", instanceId, nodeName, err)
|
||||
return err
|
||||
}
|
||||
|
@ -419,16 +379,16 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpdateCluster, validationTimeout time.Duration, operation string) error {
|
||||
if rollingUpdateData.CloudOnly {
|
||||
func (c *RollingUpdateCluster) maybeValidate(validationTimeout time.Duration, operation string) error {
|
||||
if c.CloudOnly {
|
||||
klog.Warningf("Not validating cluster as cloudonly flag is set.")
|
||||
|
||||
} else {
|
||||
klog.Info("Validating the cluster.")
|
||||
|
||||
if err := r.validateClusterWithDuration(rollingUpdateData, validationTimeout); err != nil {
|
||||
if err := c.validateClusterWithDuration(validationTimeout); err != nil {
|
||||
|
||||
if rollingUpdateData.FailOnValidate {
|
||||
if c.FailOnValidate {
|
||||
klog.Errorf("Cluster did not validate within %s", validationTimeout)
|
||||
return fmt.Errorf("error validating cluster after %s a node: %v", operation, err)
|
||||
}
|
||||
|
@ -440,15 +400,15 @@ func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpd
|
|||
}
|
||||
|
||||
// validateClusterWithDuration runs validation.ValidateCluster until either we get positive result or the timeout expires
|
||||
func (r *RollingUpdateInstanceGroup) validateClusterWithDuration(rollingUpdateData *RollingUpdateCluster, duration time.Duration) error {
|
||||
func (c *RollingUpdateCluster) validateClusterWithDuration(duration time.Duration) error {
|
||||
// Try to validate cluster at least once, this will handle durations that are lower
|
||||
// than our tick time
|
||||
if r.tryValidateCluster(rollingUpdateData, duration, rollingUpdateData.ValidateTickDuration) {
|
||||
if c.tryValidateCluster(duration) {
|
||||
return nil
|
||||
}
|
||||
|
||||
timeout := time.After(duration)
|
||||
ticker := time.NewTicker(rollingUpdateData.ValidateTickDuration)
|
||||
ticker := time.NewTicker(c.ValidateTickDuration)
|
||||
defer ticker.Stop()
|
||||
// Keep trying until we're timed out or got a result or got an error
|
||||
for {
|
||||
|
@ -458,7 +418,7 @@ func (r *RollingUpdateInstanceGroup) validateClusterWithDuration(rollingUpdateDa
|
|||
return fmt.Errorf("cluster did not validate within a duration of %q", duration)
|
||||
case <-ticker.C:
|
||||
// Got a tick, validate cluster
|
||||
if r.tryValidateCluster(rollingUpdateData, duration, rollingUpdateData.ValidateTickDuration) {
|
||||
if c.tryValidateCluster(duration) {
|
||||
return nil
|
||||
}
|
||||
// ValidateCluster didn't work yet, so let's try again
|
||||
|
@ -467,24 +427,24 @@ func (r *RollingUpdateInstanceGroup) validateClusterWithDuration(rollingUpdateDa
|
|||
}
|
||||
}
|
||||
|
||||
func (r *RollingUpdateInstanceGroup) tryValidateCluster(rollingUpdateData *RollingUpdateCluster, duration time.Duration, tickDuration time.Duration) bool {
|
||||
result, err := rollingUpdateData.ClusterValidator.Validate()
|
||||
func (c *RollingUpdateCluster) tryValidateCluster(duration time.Duration) bool {
|
||||
result, err := c.ClusterValidator.Validate()
|
||||
|
||||
if err == nil && len(result.Failures) == 0 && rollingUpdateData.ValidateSuccessDuration > 0 {
|
||||
klog.Infof("Cluster validated; revalidating in %s to make sure it does not flap.", rollingUpdateData.ValidateSuccessDuration)
|
||||
time.Sleep(rollingUpdateData.ValidateSuccessDuration)
|
||||
result, err = rollingUpdateData.ClusterValidator.Validate()
|
||||
if err == nil && len(result.Failures) == 0 && c.ValidateSuccessDuration > 0 {
|
||||
klog.Infof("Cluster validated; revalidating in %s to make sure it does not flap.", c.ValidateSuccessDuration)
|
||||
time.Sleep(c.ValidateSuccessDuration)
|
||||
result, err = c.ClusterValidator.Validate()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
klog.Infof("Cluster did not validate, will try again in %q until duration %q expires: %v.", tickDuration, duration, err)
|
||||
klog.Infof("Cluster did not validate, will try again in %q until duration %q expires: %v.", c.ValidateTickDuration, duration, err)
|
||||
return false
|
||||
} else if len(result.Failures) > 0 {
|
||||
messages := []string{}
|
||||
for _, failure := range result.Failures {
|
||||
messages = append(messages, failure.Message)
|
||||
}
|
||||
klog.Infof("Cluster did not pass validation, will try again in %q until duration %q expires: %s.", tickDuration, duration, strings.Join(messages, ", "))
|
||||
klog.Infof("Cluster did not pass validation, will try again in %q until duration %q expires: %s.", c.ValidateTickDuration, duration, strings.Join(messages, ", "))
|
||||
return false
|
||||
} else {
|
||||
klog.Info("Cluster validated.")
|
||||
|
@ -493,37 +453,36 @@ func (r *RollingUpdateInstanceGroup) tryValidateCluster(rollingUpdateData *Rolli
|
|||
}
|
||||
|
||||
// validateCluster runs our validation methods on the K8s Cluster.
|
||||
func (r *RollingUpdateInstanceGroup) validateCluster(rollingUpdateData *RollingUpdateCluster, cluster *api.Cluster) error {
|
||||
result, err := rollingUpdateData.ClusterValidator.Validate()
|
||||
func (c *RollingUpdateCluster) validateCluster() error {
|
||||
result, err := c.ClusterValidator.Validate()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cluster %q did not validate: %v", cluster.Name, err)
|
||||
return fmt.Errorf("cluster %q did not validate: %v", c.ClusterName, err)
|
||||
}
|
||||
if len(result.Failures) > 0 {
|
||||
messages := []string{}
|
||||
for _, failure := range result.Failures {
|
||||
messages = append(messages, failure.Message)
|
||||
}
|
||||
return fmt.Errorf("cluster %q did not pass validation: %s", cluster.Name, strings.Join(messages, ", "))
|
||||
return fmt.Errorf("cluster %q did not pass validation: %s", c.ClusterName, strings.Join(messages, ", "))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// detachInstance detaches a Cloud Instance
|
||||
func (r *RollingUpdateInstanceGroup) detachInstance(u *cloudinstances.CloudInstanceGroupMember) error {
|
||||
func (c *RollingUpdateCluster) detachInstance(u *cloudinstances.CloudInstanceGroupMember) error {
|
||||
id := u.ID
|
||||
nodeName := ""
|
||||
if u.Node != nil {
|
||||
nodeName = u.Node.Name
|
||||
}
|
||||
if nodeName != "" {
|
||||
klog.Infof("Detaching instance %q, node %q, in group %q.", id, nodeName, r.CloudGroup.HumanName)
|
||||
klog.Infof("Detaching instance %q, node %q, in group %q.", id, nodeName, u.CloudInstanceGroup.HumanName)
|
||||
} else {
|
||||
klog.Infof("Detaching instance %q, in group %q.", id, r.CloudGroup.HumanName)
|
||||
klog.Infof("Detaching instance %q, in group %q.", id, u.CloudInstanceGroup.HumanName)
|
||||
}
|
||||
|
||||
if err := r.Cloud.DetachInstance(u); err != nil {
|
||||
if err := c.Cloud.DetachInstance(u); err != nil {
|
||||
if nodeName != "" {
|
||||
return fmt.Errorf("error detaching instance %q, node %q: %v", id, nodeName, err)
|
||||
} else {
|
||||
|
@ -534,20 +493,20 @@ func (r *RollingUpdateInstanceGroup) detachInstance(u *cloudinstances.CloudInsta
|
|||
return nil
|
||||
}
|
||||
|
||||
// DeleteInstance deletes an Cloud Instance.
|
||||
func (r *RollingUpdateInstanceGroup) DeleteInstance(u *cloudinstances.CloudInstanceGroupMember) error {
|
||||
// deleteInstance deletes an Cloud Instance.
|
||||
func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstanceGroupMember) error {
|
||||
id := u.ID
|
||||
nodeName := ""
|
||||
if u.Node != nil {
|
||||
nodeName = u.Node.Name
|
||||
}
|
||||
if nodeName != "" {
|
||||
klog.Infof("Stopping instance %q, node %q, in group %q (this may take a while).", id, nodeName, r.CloudGroup.HumanName)
|
||||
klog.Infof("Stopping instance %q, node %q, in group %q (this may take a while).", id, nodeName, u.CloudInstanceGroup.HumanName)
|
||||
} else {
|
||||
klog.Infof("Stopping instance %q, in group %q (this may take a while).", id, r.CloudGroup.HumanName)
|
||||
klog.Infof("Stopping instance %q, in group %q (this may take a while).", id, u.CloudInstanceGroup.HumanName)
|
||||
}
|
||||
|
||||
if err := r.Cloud.DeleteInstance(u); err != nil {
|
||||
if err := c.Cloud.DeleteInstance(u); err != nil {
|
||||
if nodeName != "" {
|
||||
return fmt.Errorf("error deleting instance %q, node %q: %v", id, nodeName, err)
|
||||
} else {
|
||||
|
@ -556,12 +515,11 @@ func (r *RollingUpdateInstanceGroup) DeleteInstance(u *cloudinstances.CloudInsta
|
|||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// DrainNode drains a K8s node.
|
||||
func (r *RollingUpdateInstanceGroup) DrainNode(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster) error {
|
||||
if rollingUpdateData.K8sClient == nil {
|
||||
// drainNode drains a K8s node.
|
||||
func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstanceGroupMember) error {
|
||||
if c.K8sClient == nil {
|
||||
return fmt.Errorf("K8sClient not set")
|
||||
}
|
||||
|
||||
|
@ -574,7 +532,7 @@ func (r *RollingUpdateInstanceGroup) DrainNode(u *cloudinstances.CloudInstanceGr
|
|||
}
|
||||
|
||||
helper := &drain.Helper{
|
||||
Client: rollingUpdateData.K8sClient,
|
||||
Client: c.K8sClient,
|
||||
Force: true,
|
||||
GracePeriodSeconds: -1,
|
||||
IgnoreAllDaemonSets: true,
|
||||
|
@ -596,19 +554,18 @@ func (r *RollingUpdateInstanceGroup) DrainNode(u *cloudinstances.CloudInstanceGr
|
|||
return fmt.Errorf("error draining node: %v", err)
|
||||
}
|
||||
|
||||
if rollingUpdateData.PostDrainDelay > 0 {
|
||||
klog.Infof("Waiting for %s for pods to stabilize after draining.", rollingUpdateData.PostDrainDelay)
|
||||
time.Sleep(rollingUpdateData.PostDrainDelay)
|
||||
if c.PostDrainDelay > 0 {
|
||||
klog.Infof("Waiting for %s for pods to stabilize after draining.", c.PostDrainDelay)
|
||||
time.Sleep(c.PostDrainDelay)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteNode deletes a node from the k8s API. It does not delete the underlying instance.
|
||||
func (r *RollingUpdateInstanceGroup) deleteNode(node *corev1.Node, rollingUpdateData *RollingUpdateCluster) error {
|
||||
k8sclient := rollingUpdateData.K8sClient
|
||||
// deleteNode deletes a node from the k8s API. It does not delete the underlying instance.
|
||||
func (c *RollingUpdateCluster) deleteNode(node *corev1.Node) error {
|
||||
var options metav1.DeleteOptions
|
||||
err := k8sclient.CoreV1().Nodes().Delete(node.Name, &options)
|
||||
err := c.K8sClient.CoreV1().Nodes().Delete(node.Name, &options)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
|
|
|
@ -108,10 +108,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C
|
|||
|
||||
defer wg.Done()
|
||||
|
||||
g, err := NewRollingUpdateInstanceGroup(c.Cloud, group)
|
||||
if err == nil {
|
||||
err = g.RollingUpdate(c, cluster, true, c.BastionInterval, c.ValidationTimeout)
|
||||
}
|
||||
err := c.rollingUpdateInstanceGroup(cluster, group, true, c.BastionInterval, c.ValidationTimeout)
|
||||
|
||||
resultsMutex.Lock()
|
||||
results[k] = err
|
||||
|
@ -136,10 +133,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C
|
|||
// and we don't want to roll all the masters at the same time. See issue #284
|
||||
|
||||
for _, group := range masterGroups {
|
||||
g, err := NewRollingUpdateInstanceGroup(c.Cloud, group)
|
||||
if err == nil {
|
||||
err = g.RollingUpdate(c, cluster, false, c.MasterInterval, c.ValidationTimeout)
|
||||
}
|
||||
err := c.rollingUpdateInstanceGroup(cluster, group, false, c.MasterInterval, c.ValidationTimeout)
|
||||
|
||||
// Do not continue update if master(s) failed, cluster is potentially in an unhealthy state
|
||||
if err != nil {
|
||||
|
@ -161,10 +155,7 @@ func (c *RollingUpdateCluster) RollingUpdate(groups map[string]*cloudinstances.C
|
|||
}
|
||||
|
||||
for k, group := range nodeGroups {
|
||||
g, err := NewRollingUpdateInstanceGroup(c.Cloud, group)
|
||||
if err == nil {
|
||||
err = g.RollingUpdate(c, cluster, false, c.NodeInterval, c.ValidationTimeout)
|
||||
}
|
||||
err := c.rollingUpdateInstanceGroup(cluster, group, false, c.NodeInterval, c.ValidationTimeout)
|
||||
|
||||
results[k] = err
|
||||
|
||||
|
|
Loading…
Reference in New Issue