kops/pkg/instancegroups/instancegroups.go

755 lines
22 KiB
Go

/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package instancegroups
import (
"bufio"
"context"
"fmt"
"os"
"strings"
"time"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/drain"
api "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/validation"
)
const rollingUpdateTaintKey = "kops.k8s.io/scheduled-for-update"
// ValidationTimeoutError represents an error that occurs when
// the cluster fails to validate within the designated timeout.
type ValidationTimeoutError struct {
operation string
err error
}
func (v *ValidationTimeoutError) Error() string {
return fmt.Sprintf("error validating cluster%s: %s", v.operation, v.err.Error())
}
func (v *ValidationTimeoutError) Unwrap() error {
return v.err
}
// Is checks that a given error is a ValidationTimeoutError.
func (v *ValidationTimeoutError) Is(err error) bool {
// Currently all validation timeout errors are equivalent
// If you wish to differentiate, please update the instances of `errors.Is` that check
// this error to take that into account
_, ok := err.(*ValidationTimeoutError)
return ok
}
// promptInteractive asks the user to continue, mostly copied from vendor/google.golang.org/api/examples/gmail.go.
func promptInteractive(upgradedHostID, upgradedHostName string) (stopPrompting bool, err error) {
stopPrompting = false
scanner := bufio.NewScanner(os.Stdin)
if upgradedHostName != "" {
klog.Infof("Pausing after finished %q, node %q", upgradedHostID, upgradedHostName)
} else {
klog.Infof("Pausing after finished %q", upgradedHostID)
}
fmt.Print("Continue? (Y)es, (N)o, (A)lwaysYes: [Y] ")
scanner.Scan()
err = scanner.Err()
if err != nil {
klog.Infof("unable to interpret input: %v", err)
return stopPrompting, err
}
val := scanner.Text()
val = strings.TrimSpace(val)
val = strings.ToLower(val)
switch val {
case "n":
klog.Info("User signaled to stop")
os.Exit(3)
case "a":
klog.Info("Always Yes, stop prompting for rest of hosts")
stopPrompting = true
}
return stopPrompting, err
}
// RollingUpdate performs a rolling update on a list of instances.
func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, group *cloudinstances.CloudInstanceGroup, sleepAfterTerminate time.Duration) (err error) {
isBastion := group.InstanceGroup.IsBastion()
// Do not need a k8s client if you are doing cloudonly.
if c.K8sClient == nil && !c.CloudOnly {
return fmt.Errorf("rollingUpdate is missing a k8s client")
}
noneReady := len(group.Ready) == 0
numInstances := len(group.Ready) + len(group.NeedUpdate)
update := group.NeedUpdate
if c.Force {
update = append(update, group.Ready...)
}
if len(update) == 0 {
return nil
}
if isBastion {
klog.V(3).Info("Not validating the cluster as instance is a bastion.")
} else if err = c.maybeValidate("", 1, group); err != nil {
return err
}
if !c.CloudOnly {
err = c.taintAllNeedUpdate(ctx, group, update)
if err != nil {
return err
}
}
nonWarmPool := []*cloudinstances.CloudInstance{}
// Run through the warm pool and delete all instances directly
for _, instance := range update {
if instance.State == cloudinstances.WarmPool {
klog.Infof("deleting warm pool instance %q", instance.ID)
err := c.Cloud.DeleteInstance(instance)
if err != nil {
return fmt.Errorf("failed to delete warm pool instance %q: %w", instance.ID, err)
}
} else {
nonWarmPool = append(nonWarmPool, instance)
}
}
update = nonWarmPool
settings := resolveSettings(c.Cluster, group.InstanceGroup, numInstances)
runningDrains := 0
maxSurge := settings.MaxSurge.IntValue()
if maxSurge > len(update) {
maxSurge = len(update)
}
maxConcurrency := maxSurge + settings.MaxUnavailable.IntValue()
// Karpenter cannot surge
if group.InstanceGroup.Spec.Manager == api.InstanceManagerKarpenter {
maxSurge = 0
}
if group.InstanceGroup.Spec.Role == api.InstanceGroupRoleControlPlane && maxSurge != 0 {
// Control plane nodes are incapable of surging because they rely on registering themselves through
// the local apiserver. That apiserver depends on the local etcd, which relies on being
// joined to the etcd cluster.
maxSurge = 0
maxConcurrency = settings.MaxUnavailable.IntValue()
if maxConcurrency == 0 {
maxConcurrency = 1
}
}
if c.Interactive {
if maxSurge > 1 {
maxSurge = 1
}
maxConcurrency = 1
}
update = prioritizeUpdate(update)
if maxSurge > 0 && !c.CloudOnly {
skippedNodes := 0
for numSurge := 1; numSurge <= maxSurge; numSurge++ {
u := update[len(update)-numSurge-skippedNodes]
if u.Status != cloudinstances.CloudInstanceStatusDetached {
if err := c.detachInstance(u); err != nil {
// If detaching a node fails, we simply proceed to the next one instead of
// bubbling up the error.
klog.Errorf("Failed to detach instance %q: %v", u.ID, err)
skippedNodes++
numSurge--
if maxSurge > len(update)-skippedNodes {
maxSurge = len(update) - skippedNodes
}
}
// If noneReady, wait until after one node is detached and its replacement validates
// before detaching more in case the current spec does not result in usable nodes.
if numSurge == maxSurge || noneReady {
// Wait for the minimum interval
klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate)
time.Sleep(sleepAfterTerminate)
if err := c.maybeValidate(" after detaching instance", c.ValidateCount, group); err != nil {
return err
}
noneReady = false
}
}
}
}
if !*settings.DrainAndTerminate {
klog.Infof("Rolling updates for InstanceGroup %s are disabled", group.InstanceGroup.Name)
return nil
}
terminateChan := make(chan error, maxConcurrency)
for uIdx, u := range update {
go func(m *cloudinstances.CloudInstance) {
terminateChan <- c.drainTerminateAndWait(ctx, m, sleepAfterTerminate)
}(u)
runningDrains++
// Wait until after one node is deleted and its replacement validates before the concurrent draining
// in case the current spec does not result in usable nodes.
if runningDrains < maxConcurrency && (!noneReady || uIdx > 0) {
continue
}
err = <-terminateChan
runningDrains--
if err != nil {
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
}
err = c.maybeValidate(" after terminating instance", c.ValidateCount, group)
if err != nil {
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
}
if c.Interactive {
nodeName := ""
if u.Node != nil {
nodeName = u.Node.Name
}
stopPrompting, err := promptInteractive(u.ID, nodeName)
if err != nil {
return err
}
if stopPrompting {
// Is a pointer to a struct, changes here push back into the original
c.Interactive = false
}
}
// Validation tends to return failures from the start of drain until the replacement is
// fully ready, so sweep up as many completions as we can before starting the next drain.
sweep:
for runningDrains > 0 {
select {
case err = <-terminateChan:
runningDrains--
if err != nil {
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
}
default:
break sweep
}
}
}
if runningDrains > 0 {
for runningDrains > 0 {
err = <-terminateChan
runningDrains--
if err != nil {
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
}
}
err = c.maybeValidate(" after terminating instance", c.ValidateCount, group)
if err != nil {
return err
}
}
return nil
}
func prioritizeUpdate(update []*cloudinstances.CloudInstance) []*cloudinstances.CloudInstance {
// The priorities are, in order:
// attached before detached
// TODO unhealthy before healthy
// NeedUpdate before Ready (preserve original order)
result := make([]*cloudinstances.CloudInstance, 0, len(update))
var detached []*cloudinstances.CloudInstance
for _, u := range update {
if u.Status == cloudinstances.CloudInstanceStatusDetached {
detached = append(detached, u)
} else {
result = append(result, u)
}
}
result = append(result, detached...)
return result
}
func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan error, err error) error {
for runningDrains > 0 {
<-terminateChan
runningDrains--
}
return err
}
func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstance) error {
var toTaint []*corev1.Node
for _, u := range update {
if u.Node != nil && !u.Node.Spec.Unschedulable {
foundTaint := false
for _, taint := range u.Node.Spec.Taints {
if taint.Key == rollingUpdateTaintKey {
foundTaint = true
}
}
if !foundTaint {
toTaint = append(toTaint, u.Node)
}
}
}
if len(toTaint) > 0 {
noun := "nodes"
if len(toTaint) == 1 {
noun = "node"
}
klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, group.InstanceGroup.Name)
for _, n := range toTaint {
if err := c.patchTaint(ctx, n); err != nil {
if c.FailOnDrainError {
return fmt.Errorf("failed to taint node %q: %v", n, err)
}
klog.Infof("Ignoring error tainting node %q: %v", n, err)
}
}
}
return nil
}
func (c *RollingUpdateCluster) patchTaint(ctx context.Context, node *corev1.Node) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
}
node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{
Key: rollingUpdateTaintKey,
Effect: corev1.TaintEffectPreferNoSchedule,
})
newData, err := json.Marshal(node)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
if err != nil {
return err
}
_, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if apierrors.IsNotFound(err) {
return nil
}
return err
}
func (c *RollingUpdateCluster) patchExcludeFromLB(ctx context.Context, node *corev1.Node) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
}
if node.Labels == nil {
node.Labels = map[string]string{}
}
if _, ok := node.Labels[corev1.LabelNodeExcludeBalancers]; ok {
return nil
}
node.Labels[corev1.LabelNodeExcludeBalancers] = ""
newData, err := json.Marshal(node)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
if err != nil {
return err
}
_, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if apierrors.IsNotFound(err) {
return nil
}
return err
}
func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstance, sleepAfterTerminate time.Duration) error {
instanceID := u.ID
nodeName := ""
if u.Node != nil {
nodeName = u.Node.Name
}
isBastion := u.CloudInstanceGroup.InstanceGroup.IsBastion()
if isBastion {
// We don't want to validate for bastions - they aren't part of the cluster
} else if c.CloudOnly {
klog.Warning("Not draining cluster nodes as 'cloudonly' flag is set.")
} else {
if u.Node != nil {
klog.Infof("Draining the node: %q.", nodeName)
if err := c.drainNode(ctx, u); err != nil {
if c.FailOnDrainError {
return fmt.Errorf("failed to drain node %q: %v", nodeName, err)
}
klog.Infof("Ignoring error draining node %q: %v", nodeName, err)
}
} else {
klog.Warningf("Skipping drain of instance %q, because it is not registered in kubernetes", instanceID)
}
}
// GCE often re-uses names, so we delete the node object to prevent the new instance from using the cordoned Node object
// Scaleway has the same behavior
if (c.Cluster.GetCloudProvider() == api.CloudProviderGCE || c.Cluster.GetCloudProvider() == api.CloudProviderScaleway) &&
!isBastion && !c.CloudOnly {
if u.Node == nil {
klog.Warningf("no kubernetes Node associated with %s, skipping node deletion", instanceID)
} else {
klog.Infof("deleting node %q from kubernetes", nodeName)
if err := c.deleteNode(ctx, u.Node); err != nil {
return fmt.Errorf("error deleting node %q: %v", nodeName, err)
}
}
}
if err := c.deleteInstance(u); err != nil {
klog.Errorf("error deleting instance %q, node %q: %v", instanceID, nodeName, err)
return err
}
if err := c.reconcileInstanceGroup(ctx); err != nil {
klog.Errorf("error reconciling instance group %q: %v", u.CloudInstanceGroup.HumanName, err)
return err
}
// Wait for the minimum interval
klog.Infof("waiting for %v after terminating instance", sleepAfterTerminate)
time.Sleep(sleepAfterTerminate)
return nil
}
func (c *RollingUpdateCluster) reconcileInstanceGroup(ctx context.Context) error {
if c.Cluster.GetCloudProvider() != api.CloudProviderOpenstack &&
c.Cluster.GetCloudProvider() != api.CloudProviderHetzner &&
c.Cluster.GetCloudProvider() != api.CloudProviderScaleway &&
c.Cluster.GetCloudProvider() != api.CloudProviderDO &&
c.Cluster.GetCloudProvider() != api.CloudProviderAzure {
return nil
}
rto := fi.RunTasksOptions{}
rto.InitDefaults()
applyCmd := &cloudup.ApplyClusterCmd{
Cloud: c.Cloud,
Clientset: c.Clientset,
Cluster: c.Cluster,
DryRun: false,
AllowKopsDowngrade: true,
RunTasksOptions: &rto,
OutDir: "",
Phase: "",
TargetName: "direct",
LifecycleOverrides: map[string]fi.Lifecycle{},
DeletionProcessing: fi.DeletionProcessingModeDeleteIfNotDeferrred,
}
_, err := applyCmd.Run(ctx)
return err
}
func (c *RollingUpdateCluster) maybeValidate(operation string, validateCount int, group *cloudinstances.CloudInstanceGroup) error {
if c.CloudOnly {
klog.Warningf("Not validating cluster as cloudonly flag is set.")
} else {
klog.Info("Validating the cluster.")
if err := c.validateClusterWithTimeout(validateCount, group); err != nil {
if c.FailOnValidate {
klog.Errorf("Cluster did not validate within %s", c.ValidationTimeout)
return &ValidationTimeoutError{
operation: operation,
err: err,
}
}
klog.Warningf("Cluster validation failed%s, proceeding since fail-on-validate is set to false: %v", operation, err)
}
}
return nil
}
// validateClusterWithTimeout runs validation.ValidateCluster until either we get positive result or the timeout expires
func (c *RollingUpdateCluster) validateClusterWithTimeout(validateCount int, group *cloudinstances.CloudInstanceGroup) error {
ctx, cancel := context.WithTimeout(context.Background(), c.ValidationTimeout)
defer cancel()
if validateCount == 0 {
klog.Warningf("skipping cluster validation because validate-count was 0")
return nil
}
successCount := 0
for {
// Note that we validate at least once before checking the timeout, in case the cluster is healthy with a short timeout
result, err := c.ClusterValidator.Validate(ctx)
if err == nil && !hasFailureRelevantToGroup(result.Failures, group) {
successCount++
if successCount >= validateCount {
klog.Info("Cluster validated.")
return nil
}
klog.Infof("Cluster validated; revalidating in %s to make sure it does not flap.", c.ValidateSuccessDuration)
time.Sleep(c.ValidateSuccessDuration)
continue
}
if err != nil {
if ctx.Err() != nil {
klog.Infof("Cluster did not validate within deadline: %v.", err)
break
}
klog.Infof("Cluster did not validate, will retry in %q: %v.", c.ValidateTickDuration, err)
} else if len(result.Failures) > 0 {
messages := []string{}
for _, failure := range result.Failures {
messages = append(messages, failure.Message)
}
if ctx.Err() != nil {
klog.Infof("Cluster did not pass validation within deadline: %s.", strings.Join(messages, ", "))
break
}
klog.Infof("Cluster did not pass validation, will retry in %q: %s.", c.ValidateTickDuration, strings.Join(messages, ", "))
}
// Reset the success count; we want N consecutive successful validations
successCount = 0
// Wait before retrying in some cases
// TODO: Should we check if we have enough time left before the deadline?
time.Sleep(c.ValidateTickDuration)
}
return fmt.Errorf("cluster did not validate within a duration of %q", c.ValidationTimeout)
}
// checks if the validation failures returned after cluster validation are relevant to the current
// instance group whose rolling update is occurring
func hasFailureRelevantToGroup(failures []*validation.ValidationError, group *cloudinstances.CloudInstanceGroup) bool {
// Ignore non critical validation errors in other instance groups like below target size errors
for _, failure := range failures {
// Certain failures like a system-critical-pod failure and dns server related failures
// set their InstanceGroup to nil, since we cannot associate the failure to any one group
if failure.InstanceGroup == nil {
return true
}
// if there is a failure in the same instance group or a failure which has cluster wide impact
if (failure.InstanceGroup.IsControlPlane()) || (failure.InstanceGroup == group.InstanceGroup) {
return true
}
}
return false
}
// detachInstance detaches a Cloud Instance
func (c *RollingUpdateCluster) detachInstance(u *cloudinstances.CloudInstance) error {
id := u.ID
nodeName := ""
if u.Node != nil {
nodeName = u.Node.Name
}
if nodeName != "" {
klog.Infof("Detaching instance %q, node %q, in group %q.", id, nodeName, u.CloudInstanceGroup.HumanName)
} else {
klog.Infof("Detaching instance %q, in group %q.", id, u.CloudInstanceGroup.HumanName)
}
if err := c.Cloud.DetachInstance(u); err != nil {
if nodeName != "" {
return fmt.Errorf("error detaching instance %q, node %q: %v", id, nodeName, err)
}
return fmt.Errorf("error detaching instance %q: %v", id, err)
}
return nil
}
// deleteInstance deletes an Cloud Instance.
func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstance) error {
id := u.ID
nodeName := ""
if u.Node != nil {
nodeName = u.Node.Name
}
if nodeName != "" {
klog.Infof("Stopping instance %q, node %q, in group %q (this may take a while).", id, nodeName, u.CloudInstanceGroup.HumanName)
} else {
klog.Infof("Stopping instance %q, in group %q (this may take a while).", id, u.CloudInstanceGroup.HumanName)
}
if err := c.Cloud.DeleteInstance(u); err != nil {
if nodeName != "" {
return fmt.Errorf("error deleting instance %q, node %q: %v", id, nodeName, err)
}
return fmt.Errorf("error deleting instance %q: %v", id, err)
}
return nil
}
// drainNode drains a K8s node.
func (c *RollingUpdateCluster) drainNode(ctx context.Context, u *cloudinstances.CloudInstance) error {
if c.K8sClient == nil {
return fmt.Errorf("K8sClient not set")
}
if u.Node == nil {
return fmt.Errorf("node not set")
}
if u.Node.Name == "" {
return fmt.Errorf("node name not set")
}
helper := &drain.Helper{
Ctx: ctx,
Client: c.K8sClient,
Force: true,
GracePeriodSeconds: -1,
IgnoreAllDaemonSets: true,
Out: os.Stdout,
ErrOut: os.Stderr,
Timeout: c.DrainTimeout,
// We want to proceed even when pods are using emptyDir volumes
DeleteEmptyDirData: true,
}
if err := drain.RunCordonOrUncordon(helper, u.Node, true); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error cordoning node: %v", err)
}
if err := c.patchExcludeFromLB(ctx, u.Node); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error excluding node from load balancer: %v", err)
}
shouldDeregister := true
if !c.Options.DeregisterControlPlaneNodes {
if u.CloudInstanceGroup != nil && u.CloudInstanceGroup.InstanceGroup != nil {
role := u.CloudInstanceGroup.InstanceGroup.Spec.Role
switch role {
case api.InstanceGroupRoleAPIServer, api.InstanceGroupRoleControlPlane:
klog.Infof("skipping deregistration of instance %q, as part of instancegroup with role %q", u.ID, role)
shouldDeregister = false
}
}
}
if shouldDeregister {
if err := c.Cloud.DeregisterInstance(u); err != nil {
return fmt.Errorf("error deregistering instance %q, node %q: %w", u.ID, u.Node.Name, err)
}
}
if err := drain.RunNodeDrain(helper, u.Node.Name); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error draining node: %v", err)
}
if c.PostDrainDelay > 0 {
klog.Infof("Waiting for %s for pods to stabilize after draining.", c.PostDrainDelay)
time.Sleep(c.PostDrainDelay)
}
return nil
}
// deleteNode deletes a node from the k8s API. It does not delete the underlying instance.
func (c *RollingUpdateCluster) deleteNode(ctx context.Context, node *corev1.Node) error {
var options metav1.DeleteOptions
err := c.K8sClient.CoreV1().Nodes().Delete(ctx, node.Name, options)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error deleting node: %v", err)
}
return nil
}
// UpdateSingleInstance performs a rolling update on a single instance
func (c *RollingUpdateCluster) UpdateSingleInstance(ctx context.Context, cloudMember *cloudinstances.CloudInstance, detach bool) error {
if detach {
if cloudMember.CloudInstanceGroup.InstanceGroup.IsControlPlane() {
klog.Warning("cannot detach control-plane instances. Assuming --surge=false")
} else if cloudMember.CloudInstanceGroup.InstanceGroup.Spec.Manager != api.InstanceManagerKarpenter {
err := c.detachInstance(cloudMember)
if err != nil {
return fmt.Errorf("failed to detach instance: %v", err)
}
if err := c.maybeValidate(" after detaching instance", c.ValidateCount, cloudMember.CloudInstanceGroup); err != nil {
return err
}
}
}
return c.drainTerminateAndWait(ctx, cloudMember, 0)
}