mirror of https://github.com/kubernetes/kops.git
755 lines
22 KiB
Go
755 lines
22 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package instancegroups
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"k8s.io/kops/upup/pkg/fi"
|
|
"k8s.io/kops/upup/pkg/fi/cloudup"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/json"
|
|
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/kubectl/pkg/drain"
|
|
|
|
api "k8s.io/kops/pkg/apis/kops"
|
|
"k8s.io/kops/pkg/cloudinstances"
|
|
"k8s.io/kops/pkg/validation"
|
|
)
|
|
|
|
const rollingUpdateTaintKey = "kops.k8s.io/scheduled-for-update"
|
|
|
|
// ValidationTimeoutError represents an error that occurs when
|
|
// the cluster fails to validate within the designated timeout.
|
|
type ValidationTimeoutError struct {
|
|
operation string
|
|
err error
|
|
}
|
|
|
|
func (v *ValidationTimeoutError) Error() string {
|
|
return fmt.Sprintf("error validating cluster%s: %s", v.operation, v.err.Error())
|
|
}
|
|
|
|
func (v *ValidationTimeoutError) Unwrap() error {
|
|
return v.err
|
|
}
|
|
|
|
// Is checks that a given error is a ValidationTimeoutError.
|
|
func (v *ValidationTimeoutError) Is(err error) bool {
|
|
// Currently all validation timeout errors are equivalent
|
|
// If you wish to differentiate, please update the instances of `errors.Is` that check
|
|
// this error to take that into account
|
|
_, ok := err.(*ValidationTimeoutError)
|
|
return ok
|
|
}
|
|
|
|
// promptInteractive asks the user to continue, mostly copied from vendor/google.golang.org/api/examples/gmail.go.
|
|
func promptInteractive(upgradedHostID, upgradedHostName string) (stopPrompting bool, err error) {
|
|
stopPrompting = false
|
|
scanner := bufio.NewScanner(os.Stdin)
|
|
if upgradedHostName != "" {
|
|
klog.Infof("Pausing after finished %q, node %q", upgradedHostID, upgradedHostName)
|
|
} else {
|
|
klog.Infof("Pausing after finished %q", upgradedHostID)
|
|
}
|
|
fmt.Print("Continue? (Y)es, (N)o, (A)lwaysYes: [Y] ")
|
|
scanner.Scan()
|
|
err = scanner.Err()
|
|
if err != nil {
|
|
klog.Infof("unable to interpret input: %v", err)
|
|
return stopPrompting, err
|
|
}
|
|
val := scanner.Text()
|
|
val = strings.TrimSpace(val)
|
|
val = strings.ToLower(val)
|
|
switch val {
|
|
case "n":
|
|
klog.Info("User signaled to stop")
|
|
os.Exit(3)
|
|
case "a":
|
|
klog.Info("Always Yes, stop prompting for rest of hosts")
|
|
stopPrompting = true
|
|
}
|
|
return stopPrompting, err
|
|
}
|
|
|
|
// RollingUpdate performs a rolling update on a list of instances.
|
|
func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, group *cloudinstances.CloudInstanceGroup, sleepAfterTerminate time.Duration) (err error) {
|
|
isBastion := group.InstanceGroup.IsBastion()
|
|
// Do not need a k8s client if you are doing cloudonly.
|
|
if c.K8sClient == nil && !c.CloudOnly {
|
|
return fmt.Errorf("rollingUpdate is missing a k8s client")
|
|
}
|
|
|
|
noneReady := len(group.Ready) == 0
|
|
numInstances := len(group.Ready) + len(group.NeedUpdate)
|
|
update := group.NeedUpdate
|
|
if c.Force {
|
|
update = append(update, group.Ready...)
|
|
}
|
|
|
|
if len(update) == 0 {
|
|
return nil
|
|
}
|
|
|
|
if isBastion {
|
|
klog.V(3).Info("Not validating the cluster as instance is a bastion.")
|
|
} else if err = c.maybeValidate("", 1, group); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !c.CloudOnly {
|
|
err = c.taintAllNeedUpdate(ctx, group, update)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
nonWarmPool := []*cloudinstances.CloudInstance{}
|
|
// Run through the warm pool and delete all instances directly
|
|
for _, instance := range update {
|
|
if instance.State == cloudinstances.WarmPool {
|
|
klog.Infof("deleting warm pool instance %q", instance.ID)
|
|
err := c.Cloud.DeleteInstance(instance)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete warm pool instance %q: %w", instance.ID, err)
|
|
}
|
|
} else {
|
|
nonWarmPool = append(nonWarmPool, instance)
|
|
}
|
|
}
|
|
update = nonWarmPool
|
|
|
|
settings := resolveSettings(c.Cluster, group.InstanceGroup, numInstances)
|
|
|
|
runningDrains := 0
|
|
maxSurge := settings.MaxSurge.IntValue()
|
|
|
|
if maxSurge > len(update) {
|
|
maxSurge = len(update)
|
|
}
|
|
|
|
maxConcurrency := maxSurge + settings.MaxUnavailable.IntValue()
|
|
|
|
// Karpenter cannot surge
|
|
if group.InstanceGroup.Spec.Manager == api.InstanceManagerKarpenter {
|
|
maxSurge = 0
|
|
}
|
|
|
|
if group.InstanceGroup.Spec.Role == api.InstanceGroupRoleControlPlane && maxSurge != 0 {
|
|
// Control plane nodes are incapable of surging because they rely on registering themselves through
|
|
// the local apiserver. That apiserver depends on the local etcd, which relies on being
|
|
// joined to the etcd cluster.
|
|
maxSurge = 0
|
|
maxConcurrency = settings.MaxUnavailable.IntValue()
|
|
if maxConcurrency == 0 {
|
|
maxConcurrency = 1
|
|
}
|
|
}
|
|
|
|
if c.Interactive {
|
|
if maxSurge > 1 {
|
|
maxSurge = 1
|
|
}
|
|
maxConcurrency = 1
|
|
}
|
|
|
|
update = prioritizeUpdate(update)
|
|
|
|
if maxSurge > 0 && !c.CloudOnly {
|
|
skippedNodes := 0
|
|
for numSurge := 1; numSurge <= maxSurge; numSurge++ {
|
|
u := update[len(update)-numSurge-skippedNodes]
|
|
if u.Status != cloudinstances.CloudInstanceStatusDetached {
|
|
if err := c.detachInstance(u); err != nil {
|
|
// If detaching a node fails, we simply proceed to the next one instead of
|
|
// bubbling up the error.
|
|
klog.Errorf("Failed to detach instance %q: %v", u.ID, err)
|
|
skippedNodes++
|
|
numSurge--
|
|
if maxSurge > len(update)-skippedNodes {
|
|
maxSurge = len(update) - skippedNodes
|
|
}
|
|
}
|
|
|
|
// If noneReady, wait until after one node is detached and its replacement validates
|
|
// before detaching more in case the current spec does not result in usable nodes.
|
|
if numSurge == maxSurge || noneReady {
|
|
// Wait for the minimum interval
|
|
klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate)
|
|
time.Sleep(sleepAfterTerminate)
|
|
|
|
if err := c.maybeValidate(" after detaching instance", c.ValidateCount, group); err != nil {
|
|
return err
|
|
}
|
|
noneReady = false
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if !*settings.DrainAndTerminate {
|
|
klog.Infof("Rolling updates for InstanceGroup %s are disabled", group.InstanceGroup.Name)
|
|
return nil
|
|
}
|
|
|
|
terminateChan := make(chan error, maxConcurrency)
|
|
|
|
for uIdx, u := range update {
|
|
go func(m *cloudinstances.CloudInstance) {
|
|
terminateChan <- c.drainTerminateAndWait(ctx, m, sleepAfterTerminate)
|
|
}(u)
|
|
runningDrains++
|
|
|
|
// Wait until after one node is deleted and its replacement validates before the concurrent draining
|
|
// in case the current spec does not result in usable nodes.
|
|
if runningDrains < maxConcurrency && (!noneReady || uIdx > 0) {
|
|
continue
|
|
}
|
|
|
|
err = <-terminateChan
|
|
runningDrains--
|
|
if err != nil {
|
|
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
|
|
}
|
|
|
|
err = c.maybeValidate(" after terminating instance", c.ValidateCount, group)
|
|
if err != nil {
|
|
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
|
|
}
|
|
|
|
if c.Interactive {
|
|
nodeName := ""
|
|
if u.Node != nil {
|
|
nodeName = u.Node.Name
|
|
}
|
|
|
|
stopPrompting, err := promptInteractive(u.ID, nodeName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if stopPrompting {
|
|
// Is a pointer to a struct, changes here push back into the original
|
|
c.Interactive = false
|
|
}
|
|
}
|
|
|
|
// Validation tends to return failures from the start of drain until the replacement is
|
|
// fully ready, so sweep up as many completions as we can before starting the next drain.
|
|
sweep:
|
|
for runningDrains > 0 {
|
|
select {
|
|
case err = <-terminateChan:
|
|
runningDrains--
|
|
if err != nil {
|
|
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
|
|
}
|
|
default:
|
|
break sweep
|
|
}
|
|
}
|
|
}
|
|
|
|
if runningDrains > 0 {
|
|
for runningDrains > 0 {
|
|
err = <-terminateChan
|
|
runningDrains--
|
|
if err != nil {
|
|
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
|
|
}
|
|
}
|
|
|
|
err = c.maybeValidate(" after terminating instance", c.ValidateCount, group)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func prioritizeUpdate(update []*cloudinstances.CloudInstance) []*cloudinstances.CloudInstance {
|
|
// The priorities are, in order:
|
|
// attached before detached
|
|
// TODO unhealthy before healthy
|
|
// NeedUpdate before Ready (preserve original order)
|
|
result := make([]*cloudinstances.CloudInstance, 0, len(update))
|
|
var detached []*cloudinstances.CloudInstance
|
|
for _, u := range update {
|
|
if u.Status == cloudinstances.CloudInstanceStatusDetached {
|
|
detached = append(detached, u)
|
|
} else {
|
|
result = append(result, u)
|
|
}
|
|
}
|
|
|
|
result = append(result, detached...)
|
|
return result
|
|
}
|
|
|
|
func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan error, err error) error {
|
|
for runningDrains > 0 {
|
|
<-terminateChan
|
|
runningDrains--
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstance) error {
|
|
var toTaint []*corev1.Node
|
|
for _, u := range update {
|
|
if u.Node != nil && !u.Node.Spec.Unschedulable {
|
|
foundTaint := false
|
|
for _, taint := range u.Node.Spec.Taints {
|
|
if taint.Key == rollingUpdateTaintKey {
|
|
foundTaint = true
|
|
}
|
|
}
|
|
if !foundTaint {
|
|
toTaint = append(toTaint, u.Node)
|
|
}
|
|
}
|
|
}
|
|
if len(toTaint) > 0 {
|
|
noun := "nodes"
|
|
if len(toTaint) == 1 {
|
|
noun = "node"
|
|
}
|
|
klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, group.InstanceGroup.Name)
|
|
for _, n := range toTaint {
|
|
if err := c.patchTaint(ctx, n); err != nil {
|
|
if c.FailOnDrainError {
|
|
return fmt.Errorf("failed to taint node %q: %v", n, err)
|
|
}
|
|
klog.Infof("Ignoring error tainting node %q: %v", n, err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *RollingUpdateCluster) patchTaint(ctx context.Context, node *corev1.Node) error {
|
|
oldData, err := json.Marshal(node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{
|
|
Key: rollingUpdateTaintKey,
|
|
Effect: corev1.TaintEffectPreferNoSchedule,
|
|
})
|
|
|
|
newData, err := json.Marshal(node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *RollingUpdateCluster) patchExcludeFromLB(ctx context.Context, node *corev1.Node) error {
|
|
oldData, err := json.Marshal(node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if node.Labels == nil {
|
|
node.Labels = map[string]string{}
|
|
}
|
|
|
|
if _, ok := node.Labels[corev1.LabelNodeExcludeBalancers]; ok {
|
|
return nil
|
|
}
|
|
node.Labels[corev1.LabelNodeExcludeBalancers] = ""
|
|
|
|
newData, err := json.Marshal(node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstance, sleepAfterTerminate time.Duration) error {
|
|
instanceID := u.ID
|
|
|
|
nodeName := ""
|
|
if u.Node != nil {
|
|
nodeName = u.Node.Name
|
|
}
|
|
|
|
isBastion := u.CloudInstanceGroup.InstanceGroup.IsBastion()
|
|
|
|
if isBastion {
|
|
// We don't want to validate for bastions - they aren't part of the cluster
|
|
} else if c.CloudOnly {
|
|
klog.Warning("Not draining cluster nodes as 'cloudonly' flag is set.")
|
|
} else {
|
|
if u.Node != nil {
|
|
klog.Infof("Draining the node: %q.", nodeName)
|
|
|
|
if err := c.drainNode(ctx, u); err != nil {
|
|
if c.FailOnDrainError {
|
|
return fmt.Errorf("failed to drain node %q: %v", nodeName, err)
|
|
}
|
|
klog.Infof("Ignoring error draining node %q: %v", nodeName, err)
|
|
}
|
|
} else {
|
|
klog.Warningf("Skipping drain of instance %q, because it is not registered in kubernetes", instanceID)
|
|
}
|
|
}
|
|
|
|
// GCE often re-uses names, so we delete the node object to prevent the new instance from using the cordoned Node object
|
|
// Scaleway has the same behavior
|
|
if (c.Cluster.GetCloudProvider() == api.CloudProviderGCE || c.Cluster.GetCloudProvider() == api.CloudProviderScaleway) &&
|
|
!isBastion && !c.CloudOnly {
|
|
if u.Node == nil {
|
|
klog.Warningf("no kubernetes Node associated with %s, skipping node deletion", instanceID)
|
|
} else {
|
|
klog.Infof("deleting node %q from kubernetes", nodeName)
|
|
if err := c.deleteNode(ctx, u.Node); err != nil {
|
|
return fmt.Errorf("error deleting node %q: %v", nodeName, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := c.deleteInstance(u); err != nil {
|
|
klog.Errorf("error deleting instance %q, node %q: %v", instanceID, nodeName, err)
|
|
return err
|
|
}
|
|
|
|
if err := c.reconcileInstanceGroup(ctx); err != nil {
|
|
klog.Errorf("error reconciling instance group %q: %v", u.CloudInstanceGroup.HumanName, err)
|
|
return err
|
|
}
|
|
|
|
// Wait for the minimum interval
|
|
klog.Infof("waiting for %v after terminating instance", sleepAfterTerminate)
|
|
time.Sleep(sleepAfterTerminate)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *RollingUpdateCluster) reconcileInstanceGroup(ctx context.Context) error {
|
|
if c.Cluster.GetCloudProvider() != api.CloudProviderOpenstack &&
|
|
c.Cluster.GetCloudProvider() != api.CloudProviderHetzner &&
|
|
c.Cluster.GetCloudProvider() != api.CloudProviderScaleway &&
|
|
c.Cluster.GetCloudProvider() != api.CloudProviderDO &&
|
|
c.Cluster.GetCloudProvider() != api.CloudProviderAzure {
|
|
return nil
|
|
}
|
|
rto := fi.RunTasksOptions{}
|
|
rto.InitDefaults()
|
|
applyCmd := &cloudup.ApplyClusterCmd{
|
|
Cloud: c.Cloud,
|
|
Clientset: c.Clientset,
|
|
Cluster: c.Cluster,
|
|
DryRun: false,
|
|
AllowKopsDowngrade: true,
|
|
RunTasksOptions: &rto,
|
|
OutDir: "",
|
|
Phase: "",
|
|
TargetName: "direct",
|
|
LifecycleOverrides: map[string]fi.Lifecycle{},
|
|
|
|
DeletionProcessing: fi.DeletionProcessingModeDeleteIfNotDeferrred,
|
|
}
|
|
|
|
_, err := applyCmd.Run(ctx)
|
|
return err
|
|
}
|
|
|
|
func (c *RollingUpdateCluster) maybeValidate(operation string, validateCount int, group *cloudinstances.CloudInstanceGroup) error {
|
|
if c.CloudOnly {
|
|
klog.Warningf("Not validating cluster as cloudonly flag is set.")
|
|
} else {
|
|
klog.Info("Validating the cluster.")
|
|
|
|
if err := c.validateClusterWithTimeout(validateCount, group); err != nil {
|
|
|
|
if c.FailOnValidate {
|
|
klog.Errorf("Cluster did not validate within %s", c.ValidationTimeout)
|
|
return &ValidationTimeoutError{
|
|
operation: operation,
|
|
err: err,
|
|
}
|
|
}
|
|
|
|
klog.Warningf("Cluster validation failed%s, proceeding since fail-on-validate is set to false: %v", operation, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// validateClusterWithTimeout runs validation.ValidateCluster until either we get positive result or the timeout expires
|
|
func (c *RollingUpdateCluster) validateClusterWithTimeout(validateCount int, group *cloudinstances.CloudInstanceGroup) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), c.ValidationTimeout)
|
|
defer cancel()
|
|
|
|
if validateCount == 0 {
|
|
klog.Warningf("skipping cluster validation because validate-count was 0")
|
|
return nil
|
|
}
|
|
|
|
successCount := 0
|
|
|
|
for {
|
|
// Note that we validate at least once before checking the timeout, in case the cluster is healthy with a short timeout
|
|
result, err := c.ClusterValidator.Validate(ctx)
|
|
if err == nil && !hasFailureRelevantToGroup(result.Failures, group) {
|
|
successCount++
|
|
if successCount >= validateCount {
|
|
klog.Info("Cluster validated.")
|
|
return nil
|
|
}
|
|
klog.Infof("Cluster validated; revalidating in %s to make sure it does not flap.", c.ValidateSuccessDuration)
|
|
time.Sleep(c.ValidateSuccessDuration)
|
|
continue
|
|
}
|
|
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
klog.Infof("Cluster did not validate within deadline: %v.", err)
|
|
break
|
|
}
|
|
klog.Infof("Cluster did not validate, will retry in %q: %v.", c.ValidateTickDuration, err)
|
|
} else if len(result.Failures) > 0 {
|
|
messages := []string{}
|
|
for _, failure := range result.Failures {
|
|
messages = append(messages, failure.Message)
|
|
}
|
|
if ctx.Err() != nil {
|
|
klog.Infof("Cluster did not pass validation within deadline: %s.", strings.Join(messages, ", "))
|
|
break
|
|
}
|
|
klog.Infof("Cluster did not pass validation, will retry in %q: %s.", c.ValidateTickDuration, strings.Join(messages, ", "))
|
|
}
|
|
|
|
// Reset the success count; we want N consecutive successful validations
|
|
successCount = 0
|
|
|
|
// Wait before retrying in some cases
|
|
// TODO: Should we check if we have enough time left before the deadline?
|
|
time.Sleep(c.ValidateTickDuration)
|
|
}
|
|
|
|
return fmt.Errorf("cluster did not validate within a duration of %q", c.ValidationTimeout)
|
|
}
|
|
|
|
// checks if the validation failures returned after cluster validation are relevant to the current
|
|
// instance group whose rolling update is occurring
|
|
func hasFailureRelevantToGroup(failures []*validation.ValidationError, group *cloudinstances.CloudInstanceGroup) bool {
|
|
// Ignore non critical validation errors in other instance groups like below target size errors
|
|
for _, failure := range failures {
|
|
// Certain failures like a system-critical-pod failure and dns server related failures
|
|
// set their InstanceGroup to nil, since we cannot associate the failure to any one group
|
|
if failure.InstanceGroup == nil {
|
|
return true
|
|
}
|
|
|
|
// if there is a failure in the same instance group or a failure which has cluster wide impact
|
|
if (failure.InstanceGroup.IsControlPlane()) || (failure.InstanceGroup == group.InstanceGroup) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// detachInstance detaches a Cloud Instance
|
|
func (c *RollingUpdateCluster) detachInstance(u *cloudinstances.CloudInstance) error {
|
|
id := u.ID
|
|
nodeName := ""
|
|
if u.Node != nil {
|
|
nodeName = u.Node.Name
|
|
}
|
|
if nodeName != "" {
|
|
klog.Infof("Detaching instance %q, node %q, in group %q.", id, nodeName, u.CloudInstanceGroup.HumanName)
|
|
} else {
|
|
klog.Infof("Detaching instance %q, in group %q.", id, u.CloudInstanceGroup.HumanName)
|
|
}
|
|
|
|
if err := c.Cloud.DetachInstance(u); err != nil {
|
|
if nodeName != "" {
|
|
return fmt.Errorf("error detaching instance %q, node %q: %v", id, nodeName, err)
|
|
}
|
|
return fmt.Errorf("error detaching instance %q: %v", id, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// deleteInstance deletes an Cloud Instance.
|
|
func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstance) error {
|
|
id := u.ID
|
|
nodeName := ""
|
|
if u.Node != nil {
|
|
nodeName = u.Node.Name
|
|
}
|
|
if nodeName != "" {
|
|
klog.Infof("Stopping instance %q, node %q, in group %q (this may take a while).", id, nodeName, u.CloudInstanceGroup.HumanName)
|
|
} else {
|
|
klog.Infof("Stopping instance %q, in group %q (this may take a while).", id, u.CloudInstanceGroup.HumanName)
|
|
}
|
|
|
|
if err := c.Cloud.DeleteInstance(u); err != nil {
|
|
if nodeName != "" {
|
|
return fmt.Errorf("error deleting instance %q, node %q: %v", id, nodeName, err)
|
|
}
|
|
return fmt.Errorf("error deleting instance %q: %v", id, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// drainNode drains a K8s node.
|
|
func (c *RollingUpdateCluster) drainNode(ctx context.Context, u *cloudinstances.CloudInstance) error {
|
|
if c.K8sClient == nil {
|
|
return fmt.Errorf("K8sClient not set")
|
|
}
|
|
|
|
if u.Node == nil {
|
|
return fmt.Errorf("node not set")
|
|
}
|
|
|
|
if u.Node.Name == "" {
|
|
return fmt.Errorf("node name not set")
|
|
}
|
|
|
|
helper := &drain.Helper{
|
|
Ctx: ctx,
|
|
Client: c.K8sClient,
|
|
Force: true,
|
|
GracePeriodSeconds: -1,
|
|
IgnoreAllDaemonSets: true,
|
|
Out: os.Stdout,
|
|
ErrOut: os.Stderr,
|
|
Timeout: c.DrainTimeout,
|
|
|
|
// We want to proceed even when pods are using emptyDir volumes
|
|
DeleteEmptyDirData: true,
|
|
}
|
|
|
|
if err := drain.RunCordonOrUncordon(helper, u.Node, true); err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("error cordoning node: %v", err)
|
|
}
|
|
|
|
if err := c.patchExcludeFromLB(ctx, u.Node); err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("error excluding node from load balancer: %v", err)
|
|
}
|
|
|
|
shouldDeregister := true
|
|
if !c.Options.DeregisterControlPlaneNodes {
|
|
if u.CloudInstanceGroup != nil && u.CloudInstanceGroup.InstanceGroup != nil {
|
|
role := u.CloudInstanceGroup.InstanceGroup.Spec.Role
|
|
switch role {
|
|
case api.InstanceGroupRoleAPIServer, api.InstanceGroupRoleControlPlane:
|
|
klog.Infof("skipping deregistration of instance %q, as part of instancegroup with role %q", u.ID, role)
|
|
shouldDeregister = false
|
|
}
|
|
}
|
|
}
|
|
|
|
if shouldDeregister {
|
|
if err := c.Cloud.DeregisterInstance(u); err != nil {
|
|
return fmt.Errorf("error deregistering instance %q, node %q: %w", u.ID, u.Node.Name, err)
|
|
}
|
|
}
|
|
|
|
if err := drain.RunNodeDrain(helper, u.Node.Name); err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("error draining node: %v", err)
|
|
}
|
|
|
|
if c.PostDrainDelay > 0 {
|
|
klog.Infof("Waiting for %s for pods to stabilize after draining.", c.PostDrainDelay)
|
|
time.Sleep(c.PostDrainDelay)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// deleteNode deletes a node from the k8s API. It does not delete the underlying instance.
|
|
func (c *RollingUpdateCluster) deleteNode(ctx context.Context, node *corev1.Node) error {
|
|
var options metav1.DeleteOptions
|
|
err := c.K8sClient.CoreV1().Nodes().Delete(ctx, node.Name, options)
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("error deleting node: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateSingleInstance performs a rolling update on a single instance
|
|
func (c *RollingUpdateCluster) UpdateSingleInstance(ctx context.Context, cloudMember *cloudinstances.CloudInstance, detach bool) error {
|
|
if detach {
|
|
if cloudMember.CloudInstanceGroup.InstanceGroup.IsControlPlane() {
|
|
klog.Warning("cannot detach control-plane instances. Assuming --surge=false")
|
|
} else if cloudMember.CloudInstanceGroup.InstanceGroup.Spec.Manager != api.InstanceManagerKarpenter {
|
|
err := c.detachInstance(cloudMember)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to detach instance: %v", err)
|
|
}
|
|
if err := c.maybeValidate(" after detaching instance", c.ValidateCount, cloudMember.CloudInstanceGroup); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return c.drainTerminateAndWait(ctx, cloudMember, 0)
|
|
}
|