/* Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package instancegroups import ( "bufio" "context" "fmt" "os" "strings" "time" "k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi/cloudup" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/klog/v2" "k8s.io/kubectl/pkg/drain" api "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/cloudinstances" "k8s.io/kops/pkg/validation" ) const rollingUpdateTaintKey = "kops.k8s.io/scheduled-for-update" // ValidationTimeoutError represents an error that occurs when // the cluster fails to validate within the designated timeout. type ValidationTimeoutError struct { operation string err error } func (v *ValidationTimeoutError) Error() string { return fmt.Sprintf("error validating cluster%s: %s", v.operation, v.err.Error()) } func (v *ValidationTimeoutError) Unwrap() error { return v.err } // Is checks that a given error is a ValidationTimeoutError. func (v *ValidationTimeoutError) Is(err error) bool { // Currently all validation timeout errors are equivalent // If you wish to differentiate, please update the instances of `errors.Is` that check // this error to take that into account _, ok := err.(*ValidationTimeoutError) return ok } // promptInteractive asks the user to continue, mostly copied from vendor/google.golang.org/api/examples/gmail.go. func promptInteractive(upgradedHostID, upgradedHostName string) (stopPrompting bool, err error) { stopPrompting = false scanner := bufio.NewScanner(os.Stdin) if upgradedHostName != "" { klog.Infof("Pausing after finished %q, node %q", upgradedHostID, upgradedHostName) } else { klog.Infof("Pausing after finished %q", upgradedHostID) } fmt.Print("Continue? (Y)es, (N)o, (A)lwaysYes: [Y] ") scanner.Scan() err = scanner.Err() if err != nil { klog.Infof("unable to interpret input: %v", err) return stopPrompting, err } val := scanner.Text() val = strings.TrimSpace(val) val = strings.ToLower(val) switch val { case "n": klog.Info("User signaled to stop") os.Exit(3) case "a": klog.Info("Always Yes, stop prompting for rest of hosts") stopPrompting = true } return stopPrompting, err } // RollingUpdate performs a rolling update on a list of instances. func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, group *cloudinstances.CloudInstanceGroup, sleepAfterTerminate time.Duration) (err error) { isBastion := group.InstanceGroup.IsBastion() // Do not need a k8s client if you are doing cloudonly. if c.K8sClient == nil && !c.CloudOnly { return fmt.Errorf("rollingUpdate is missing a k8s client") } noneReady := len(group.Ready) == 0 numInstances := len(group.Ready) + len(group.NeedUpdate) update := group.NeedUpdate if c.Force { update = append(update, group.Ready...) } if len(update) == 0 { return nil } if isBastion { klog.V(3).Info("Not validating the cluster as instance is a bastion.") } else if err = c.maybeValidate("", 1, group); err != nil { return err } if !c.CloudOnly { err = c.taintAllNeedUpdate(ctx, group, update) if err != nil { return err } } nonWarmPool := []*cloudinstances.CloudInstance{} // Run through the warm pool and delete all instances directly for _, instance := range update { if instance.State == cloudinstances.WarmPool { klog.Infof("deleting warm pool instance %q", instance.ID) err := c.Cloud.DeleteInstance(instance) if err != nil { return fmt.Errorf("failed to delete warm pool instance %q: %w", instance.ID, err) } } else { nonWarmPool = append(nonWarmPool, instance) } } update = nonWarmPool settings := resolveSettings(c.Cluster, group.InstanceGroup, numInstances) runningDrains := 0 maxSurge := settings.MaxSurge.IntValue() if maxSurge > len(update) { maxSurge = len(update) } maxConcurrency := maxSurge + settings.MaxUnavailable.IntValue() // Karpenter cannot surge if group.InstanceGroup.Spec.Manager == api.InstanceManagerKarpenter { maxSurge = 0 } if group.InstanceGroup.Spec.Role == api.InstanceGroupRoleControlPlane && maxSurge != 0 { // Control plane nodes are incapable of surging because they rely on registering themselves through // the local apiserver. That apiserver depends on the local etcd, which relies on being // joined to the etcd cluster. maxSurge = 0 maxConcurrency = settings.MaxUnavailable.IntValue() if maxConcurrency == 0 { maxConcurrency = 1 } } if c.Interactive { if maxSurge > 1 { maxSurge = 1 } maxConcurrency = 1 } update = prioritizeUpdate(update) if maxSurge > 0 && !c.CloudOnly { skippedNodes := 0 for numSurge := 1; numSurge <= maxSurge; numSurge++ { u := update[len(update)-numSurge-skippedNodes] if u.Status != cloudinstances.CloudInstanceStatusDetached { if err := c.detachInstance(u); err != nil { // If detaching a node fails, we simply proceed to the next one instead of // bubbling up the error. klog.Errorf("Failed to detach instance %q: %v", u.ID, err) skippedNodes++ numSurge-- if maxSurge > len(update)-skippedNodes { maxSurge = len(update) - skippedNodes } } // If noneReady, wait until after one node is detached and its replacement validates // before detaching more in case the current spec does not result in usable nodes. if numSurge == maxSurge || noneReady { // Wait for the minimum interval klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate) time.Sleep(sleepAfterTerminate) if err := c.maybeValidate(" after detaching instance", c.ValidateCount, group); err != nil { return err } noneReady = false } } } } if !*settings.DrainAndTerminate { klog.Infof("Rolling updates for InstanceGroup %s are disabled", group.InstanceGroup.Name) return nil } terminateChan := make(chan error, maxConcurrency) for uIdx, u := range update { go func(m *cloudinstances.CloudInstance) { terminateChan <- c.drainTerminateAndWait(ctx, m, sleepAfterTerminate) }(u) runningDrains++ // Wait until after one node is deleted and its replacement validates before the concurrent draining // in case the current spec does not result in usable nodes. if runningDrains < maxConcurrency && (!noneReady || uIdx > 0) { continue } err = <-terminateChan runningDrains-- if err != nil { return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } err = c.maybeValidate(" after terminating instance", c.ValidateCount, group) if err != nil { return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } if c.Interactive { nodeName := "" if u.Node != nil { nodeName = u.Node.Name } stopPrompting, err := promptInteractive(u.ID, nodeName) if err != nil { return err } if stopPrompting { // Is a pointer to a struct, changes here push back into the original c.Interactive = false } } // Validation tends to return failures from the start of drain until the replacement is // fully ready, so sweep up as many completions as we can before starting the next drain. sweep: for runningDrains > 0 { select { case err = <-terminateChan: runningDrains-- if err != nil { return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } default: break sweep } } } if runningDrains > 0 { for runningDrains > 0 { err = <-terminateChan runningDrains-- if err != nil { return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } } err = c.maybeValidate(" after terminating instance", c.ValidateCount, group) if err != nil { return err } } return nil } func prioritizeUpdate(update []*cloudinstances.CloudInstance) []*cloudinstances.CloudInstance { // The priorities are, in order: // attached before detached // TODO unhealthy before healthy // NeedUpdate before Ready (preserve original order) result := make([]*cloudinstances.CloudInstance, 0, len(update)) var detached []*cloudinstances.CloudInstance for _, u := range update { if u.Status == cloudinstances.CloudInstanceStatusDetached { detached = append(detached, u) } else { result = append(result, u) } } result = append(result, detached...) return result } func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan error, err error) error { for runningDrains > 0 { <-terminateChan runningDrains-- } return err } func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstance) error { var toTaint []*corev1.Node for _, u := range update { if u.Node != nil && !u.Node.Spec.Unschedulable { foundTaint := false for _, taint := range u.Node.Spec.Taints { if taint.Key == rollingUpdateTaintKey { foundTaint = true } } if !foundTaint { toTaint = append(toTaint, u.Node) } } } if len(toTaint) > 0 { noun := "nodes" if len(toTaint) == 1 { noun = "node" } klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, group.InstanceGroup.Name) for _, n := range toTaint { if err := c.patchTaint(ctx, n); err != nil { if c.FailOnDrainError { return fmt.Errorf("failed to taint node %q: %v", n, err) } klog.Infof("Ignoring error tainting node %q: %v", n, err) } } } return nil } func (c *RollingUpdateCluster) patchTaint(ctx context.Context, node *corev1.Node) error { oldData, err := json.Marshal(node) if err != nil { return err } node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{ Key: rollingUpdateTaintKey, Effect: corev1.TaintEffectPreferNoSchedule, }) newData, err := json.Marshal(node) if err != nil { return err } patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, node) if err != nil { return err } _, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) if apierrors.IsNotFound(err) { return nil } return err } func (c *RollingUpdateCluster) patchExcludeFromLB(ctx context.Context, node *corev1.Node) error { oldData, err := json.Marshal(node) if err != nil { return err } if node.Labels == nil { node.Labels = map[string]string{} } if _, ok := node.Labels[corev1.LabelNodeExcludeBalancers]; ok { return nil } node.Labels[corev1.LabelNodeExcludeBalancers] = "" newData, err := json.Marshal(node) if err != nil { return err } patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, node) if err != nil { return err } _, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) if apierrors.IsNotFound(err) { return nil } return err } func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstance, sleepAfterTerminate time.Duration) error { instanceID := u.ID nodeName := "" if u.Node != nil { nodeName = u.Node.Name } isBastion := u.CloudInstanceGroup.InstanceGroup.IsBastion() if isBastion { // We don't want to validate for bastions - they aren't part of the cluster } else if c.CloudOnly { klog.Warning("Not draining cluster nodes as 'cloudonly' flag is set.") } else { if u.Node != nil { klog.Infof("Draining the node: %q.", nodeName) if err := c.drainNode(ctx, u); err != nil { if c.FailOnDrainError { return fmt.Errorf("failed to drain node %q: %v", nodeName, err) } klog.Infof("Ignoring error draining node %q: %v", nodeName, err) } } else { klog.Warningf("Skipping drain of instance %q, because it is not registered in kubernetes", instanceID) } } // GCE often re-uses names, so we delete the node object to prevent the new instance from using the cordoned Node object // Scaleway has the same behavior if (c.Cluster.GetCloudProvider() == api.CloudProviderGCE || c.Cluster.GetCloudProvider() == api.CloudProviderScaleway) && !isBastion && !c.CloudOnly { if u.Node == nil { klog.Warningf("no kubernetes Node associated with %s, skipping node deletion", instanceID) } else { klog.Infof("deleting node %q from kubernetes", nodeName) if err := c.deleteNode(ctx, u.Node); err != nil { return fmt.Errorf("error deleting node %q: %v", nodeName, err) } } } if err := c.deleteInstance(u); err != nil { klog.Errorf("error deleting instance %q, node %q: %v", instanceID, nodeName, err) return err } if err := c.reconcileInstanceGroup(ctx); err != nil { klog.Errorf("error reconciling instance group %q: %v", u.CloudInstanceGroup.HumanName, err) return err } // Wait for the minimum interval klog.Infof("waiting for %v after terminating instance", sleepAfterTerminate) time.Sleep(sleepAfterTerminate) return nil } func (c *RollingUpdateCluster) reconcileInstanceGroup(ctx context.Context) error { if c.Cluster.GetCloudProvider() != api.CloudProviderOpenstack && c.Cluster.GetCloudProvider() != api.CloudProviderHetzner && c.Cluster.GetCloudProvider() != api.CloudProviderScaleway && c.Cluster.GetCloudProvider() != api.CloudProviderDO && c.Cluster.GetCloudProvider() != api.CloudProviderAzure { return nil } rto := fi.RunTasksOptions{} rto.InitDefaults() applyCmd := &cloudup.ApplyClusterCmd{ Cloud: c.Cloud, Clientset: c.Clientset, Cluster: c.Cluster, DryRun: false, AllowKopsDowngrade: true, RunTasksOptions: &rto, OutDir: "", Phase: "", TargetName: "direct", LifecycleOverrides: map[string]fi.Lifecycle{}, DeletionProcessing: fi.DeletionProcessingModeDeleteIfNotDeferrred, } _, err := applyCmd.Run(ctx) return err } func (c *RollingUpdateCluster) maybeValidate(operation string, validateCount int, group *cloudinstances.CloudInstanceGroup) error { if c.CloudOnly { klog.Warningf("Not validating cluster as cloudonly flag is set.") } else { klog.Info("Validating the cluster.") if err := c.validateClusterWithTimeout(validateCount, group); err != nil { if c.FailOnValidate { klog.Errorf("Cluster did not validate within %s", c.ValidationTimeout) return &ValidationTimeoutError{ operation: operation, err: err, } } klog.Warningf("Cluster validation failed%s, proceeding since fail-on-validate is set to false: %v", operation, err) } } return nil } // validateClusterWithTimeout runs validation.ValidateCluster until either we get positive result or the timeout expires func (c *RollingUpdateCluster) validateClusterWithTimeout(validateCount int, group *cloudinstances.CloudInstanceGroup) error { ctx, cancel := context.WithTimeout(context.Background(), c.ValidationTimeout) defer cancel() if validateCount == 0 { klog.Warningf("skipping cluster validation because validate-count was 0") return nil } successCount := 0 for { // Note that we validate at least once before checking the timeout, in case the cluster is healthy with a short timeout result, err := c.ClusterValidator.Validate(ctx) if err == nil && !hasFailureRelevantToGroup(result.Failures, group) { successCount++ if successCount >= validateCount { klog.Info("Cluster validated.") return nil } klog.Infof("Cluster validated; revalidating in %s to make sure it does not flap.", c.ValidateSuccessDuration) time.Sleep(c.ValidateSuccessDuration) continue } if err != nil { if ctx.Err() != nil { klog.Infof("Cluster did not validate within deadline: %v.", err) break } klog.Infof("Cluster did not validate, will retry in %q: %v.", c.ValidateTickDuration, err) } else if len(result.Failures) > 0 { messages := []string{} for _, failure := range result.Failures { messages = append(messages, failure.Message) } if ctx.Err() != nil { klog.Infof("Cluster did not pass validation within deadline: %s.", strings.Join(messages, ", ")) break } klog.Infof("Cluster did not pass validation, will retry in %q: %s.", c.ValidateTickDuration, strings.Join(messages, ", ")) } // Reset the success count; we want N consecutive successful validations successCount = 0 // Wait before retrying in some cases // TODO: Should we check if we have enough time left before the deadline? time.Sleep(c.ValidateTickDuration) } return fmt.Errorf("cluster did not validate within a duration of %q", c.ValidationTimeout) } // checks if the validation failures returned after cluster validation are relevant to the current // instance group whose rolling update is occurring func hasFailureRelevantToGroup(failures []*validation.ValidationError, group *cloudinstances.CloudInstanceGroup) bool { // Ignore non critical validation errors in other instance groups like below target size errors for _, failure := range failures { // Certain failures like a system-critical-pod failure and dns server related failures // set their InstanceGroup to nil, since we cannot associate the failure to any one group if failure.InstanceGroup == nil { return true } // if there is a failure in the same instance group or a failure which has cluster wide impact if (failure.InstanceGroup.IsControlPlane()) || (failure.InstanceGroup == group.InstanceGroup) { return true } } return false } // detachInstance detaches a Cloud Instance func (c *RollingUpdateCluster) detachInstance(u *cloudinstances.CloudInstance) error { id := u.ID nodeName := "" if u.Node != nil { nodeName = u.Node.Name } if nodeName != "" { klog.Infof("Detaching instance %q, node %q, in group %q.", id, nodeName, u.CloudInstanceGroup.HumanName) } else { klog.Infof("Detaching instance %q, in group %q.", id, u.CloudInstanceGroup.HumanName) } if err := c.Cloud.DetachInstance(u); err != nil { if nodeName != "" { return fmt.Errorf("error detaching instance %q, node %q: %v", id, nodeName, err) } return fmt.Errorf("error detaching instance %q: %v", id, err) } return nil } // deleteInstance deletes an Cloud Instance. func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstance) error { id := u.ID nodeName := "" if u.Node != nil { nodeName = u.Node.Name } if nodeName != "" { klog.Infof("Stopping instance %q, node %q, in group %q (this may take a while).", id, nodeName, u.CloudInstanceGroup.HumanName) } else { klog.Infof("Stopping instance %q, in group %q (this may take a while).", id, u.CloudInstanceGroup.HumanName) } if err := c.Cloud.DeleteInstance(u); err != nil { if nodeName != "" { return fmt.Errorf("error deleting instance %q, node %q: %v", id, nodeName, err) } return fmt.Errorf("error deleting instance %q: %v", id, err) } return nil } // drainNode drains a K8s node. func (c *RollingUpdateCluster) drainNode(ctx context.Context, u *cloudinstances.CloudInstance) error { if c.K8sClient == nil { return fmt.Errorf("K8sClient not set") } if u.Node == nil { return fmt.Errorf("node not set") } if u.Node.Name == "" { return fmt.Errorf("node name not set") } helper := &drain.Helper{ Ctx: ctx, Client: c.K8sClient, Force: true, GracePeriodSeconds: -1, IgnoreAllDaemonSets: true, Out: os.Stdout, ErrOut: os.Stderr, Timeout: c.DrainTimeout, // We want to proceed even when pods are using emptyDir volumes DeleteEmptyDirData: true, } if err := drain.RunCordonOrUncordon(helper, u.Node, true); err != nil { if apierrors.IsNotFound(err) { return nil } return fmt.Errorf("error cordoning node: %v", err) } if err := c.patchExcludeFromLB(ctx, u.Node); err != nil { if apierrors.IsNotFound(err) { return nil } return fmt.Errorf("error excluding node from load balancer: %v", err) } shouldDeregister := true if !c.Options.DeregisterControlPlaneNodes { if u.CloudInstanceGroup != nil && u.CloudInstanceGroup.InstanceGroup != nil { role := u.CloudInstanceGroup.InstanceGroup.Spec.Role switch role { case api.InstanceGroupRoleAPIServer, api.InstanceGroupRoleControlPlane: klog.Infof("skipping deregistration of instance %q, as part of instancegroup with role %q", u.ID, role) shouldDeregister = false } } } if shouldDeregister { if err := c.Cloud.DeregisterInstance(u); err != nil { return fmt.Errorf("error deregistering instance %q, node %q: %w", u.ID, u.Node.Name, err) } } if err := drain.RunNodeDrain(helper, u.Node.Name); err != nil { if apierrors.IsNotFound(err) { return nil } return fmt.Errorf("error draining node: %v", err) } if c.PostDrainDelay > 0 { klog.Infof("Waiting for %s for pods to stabilize after draining.", c.PostDrainDelay) time.Sleep(c.PostDrainDelay) } return nil } // deleteNode deletes a node from the k8s API. It does not delete the underlying instance. func (c *RollingUpdateCluster) deleteNode(ctx context.Context, node *corev1.Node) error { var options metav1.DeleteOptions err := c.K8sClient.CoreV1().Nodes().Delete(ctx, node.Name, options) if err != nil { if apierrors.IsNotFound(err) { return nil } return fmt.Errorf("error deleting node: %v", err) } return nil } // UpdateSingleInstance performs a rolling update on a single instance func (c *RollingUpdateCluster) UpdateSingleInstance(ctx context.Context, cloudMember *cloudinstances.CloudInstance, detach bool) error { if detach { if cloudMember.CloudInstanceGroup.InstanceGroup.IsControlPlane() { klog.Warning("cannot detach control-plane instances. Assuming --surge=false") } else if cloudMember.CloudInstanceGroup.InstanceGroup.Spec.Manager != api.InstanceManagerKarpenter { err := c.detachInstance(cloudMember) if err != nil { return fmt.Errorf("failed to detach instance: %v", err) } if err := c.maybeValidate(" after detaching instance", c.ValidateCount, cloudMember.CloudInstanceGroup); err != nil { return err } } } return c.drainTerminateAndWait(ctx, cloudMember, 0) }