kops/pkg/instancegroups/instancegroups.go

579 lines
16 KiB
Go

/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package instancegroups
import (
"bufio"
"fmt"
"os"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/klog"
api "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kubectl/pkg/drain"
)
const rollingUpdateTaintKey = "kops.k8s.io/scheduled-for-update"
// promptInteractive asks the user to continue, mostly copied from vendor/google.golang.org/api/examples/gmail.go.
func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting bool, err error) {
stopPrompting = false
scanner := bufio.NewScanner(os.Stdin)
if upgradedHostName != "" {
klog.Infof("Pausing after finished %q, node %q", upgradedHostId, upgradedHostName)
} else {
klog.Infof("Pausing after finished %q", upgradedHostId)
}
fmt.Print("Continue? (Y)es, (N)o, (A)lwaysYes: [Y] ")
scanner.Scan()
err = scanner.Err()
if err != nil {
klog.Infof("unable to interpret input: %v", err)
return stopPrompting, err
}
val := scanner.Text()
val = strings.TrimSpace(val)
val = strings.ToLower(val)
switch val {
case "n":
klog.Info("User signaled to stop")
os.Exit(3)
case "a":
klog.Info("Always Yes, stop prompting for rest of hosts")
stopPrompting = true
}
return stopPrompting, err
}
// RollingUpdate performs a rolling update on a list of instances.
func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(cluster *api.Cluster, group *cloudinstances.CloudInstanceGroup, isBastion bool, sleepAfterTerminate time.Duration, validationTimeout time.Duration) (err error) {
// Do not need a k8s client if you are doing cloudonly.
if c.K8sClient == nil && !c.CloudOnly {
return fmt.Errorf("rollingUpdate is missing a k8s client")
}
noneReady := len(group.Ready) == 0
numInstances := len(group.Ready) + len(group.NeedUpdate)
update := group.NeedUpdate
if c.Force {
update = append(update, group.Ready...)
}
if len(update) == 0 {
return nil
}
if isBastion {
klog.V(3).Info("Not validating the cluster as instance is a bastion.")
} else if c.CloudOnly {
klog.V(3).Info("Not validating cluster as validation is turned off via the cloud-only flag.")
} else {
if err = c.validateCluster(); err != nil {
if c.FailOnValidate {
return err
}
klog.V(2).Infof("Ignoring cluster validation error: %v", err)
klog.Info("Cluster validation failed, but proceeding since fail-on-validate-error is set to false")
}
}
if !c.CloudOnly {
err = c.taintAllNeedUpdate(group, update)
if err != nil {
return err
}
}
settings := resolveSettings(cluster, group.InstanceGroup, numInstances)
runningDrains := 0
maxSurge := settings.MaxSurge.IntValue()
if maxSurge > len(update) {
maxSurge = len(update)
}
maxConcurrency := maxSurge + settings.MaxUnavailable.IntValue()
if maxConcurrency == 0 {
klog.Infof("Rolling updates for InstanceGroup %s are disabled", group.InstanceGroup.Name)
return nil
}
if group.InstanceGroup.Spec.Role == api.InstanceGroupRoleMaster && maxSurge != 0 {
// Masters are incapable of surging because they rely on registering themselves through
// the local apiserver. That apiserver depends on the local etcd, which relies on being
// joined to the etcd cluster.
maxSurge = 0
maxConcurrency = settings.MaxUnavailable.IntValue()
if maxConcurrency == 0 {
maxConcurrency = 1
}
}
if c.Interactive {
if maxSurge > 1 {
maxSurge = 1
}
maxConcurrency = 1
}
update = prioritizeUpdate(update)
if maxSurge > 0 && !c.CloudOnly {
for numSurge := 1; numSurge <= maxSurge; numSurge++ {
u := update[len(update)-numSurge]
if !u.Detached {
if err := c.detachInstance(u); err != nil {
return err
}
// If noneReady, wait until after one node is detached and its replacement validates
// before detaching more in case the current spec does not result in usable nodes.
if numSurge == maxSurge || noneReady {
// Wait for the minimum interval
klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate)
time.Sleep(sleepAfterTerminate)
if err := c.maybeValidate(validationTimeout, "detaching"); err != nil {
return err
}
noneReady = false
}
}
}
}
terminateChan := make(chan error, maxConcurrency)
for uIdx, u := range update {
go func(m *cloudinstances.CloudInstanceGroupMember) {
terminateChan <- c.drainTerminateAndWait(m, isBastion, sleepAfterTerminate)
}(u)
runningDrains++
// Wait until after one node is deleted and its replacement validates before the concurrent draining
// in case the current spec does not result in usable nodes.
if runningDrains < maxConcurrency && (!noneReady || uIdx > 0) {
continue
}
err = <-terminateChan
runningDrains--
if err != nil {
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
}
err = c.maybeValidate(validationTimeout, "removing")
if err != nil {
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
}
if c.Interactive {
nodeName := ""
if u.Node != nil {
nodeName = u.Node.Name
}
stopPrompting, err := promptInteractive(u.ID, nodeName)
if err != nil {
return err
}
if stopPrompting {
// Is a pointer to a struct, changes here push back into the original
c.Interactive = false
}
}
// Validation tends to return failures from the start of drain until the replacement is
// fully ready, so sweep up as many completions as we can before starting the next drain.
sweep:
for runningDrains > 0 {
select {
case err = <-terminateChan:
runningDrains--
if err != nil {
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
}
default:
break sweep
}
}
}
if runningDrains > 0 {
for runningDrains > 0 {
err = <-terminateChan
runningDrains--
if err != nil {
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
}
}
err = c.maybeValidate(validationTimeout, "removing")
if err != nil {
return err
}
}
return nil
}
func prioritizeUpdate(update []*cloudinstances.CloudInstanceGroupMember) []*cloudinstances.CloudInstanceGroupMember {
// The priorities are, in order:
// attached before detached
// TODO unhealthy before healthy
// NeedUpdate before Ready (preserve original order)
result := make([]*cloudinstances.CloudInstanceGroupMember, 0, len(update))
var detached []*cloudinstances.CloudInstanceGroupMember
for _, u := range update {
if u.Detached {
detached = append(detached, u)
} else {
result = append(result, u)
}
}
result = append(result, detached...)
return result
}
func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan error, err error) error {
for runningDrains > 0 {
<-terminateChan
runningDrains--
}
return err
}
func (c *RollingUpdateCluster) taintAllNeedUpdate(group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstanceGroupMember) error {
var toTaint []*corev1.Node
for _, u := range update {
if u.Node != nil && !u.Node.Spec.Unschedulable {
foundTaint := false
for _, taint := range u.Node.Spec.Taints {
if taint.Key == rollingUpdateTaintKey {
foundTaint = true
}
}
if !foundTaint {
toTaint = append(toTaint, u.Node)
}
}
}
if len(toTaint) > 0 {
noun := "nodes"
if len(toTaint) == 1 {
noun = "node"
}
klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, group.InstanceGroup.Name)
for _, n := range toTaint {
if err := c.patchTaint(n); err != nil {
if c.FailOnDrainError {
return fmt.Errorf("failed to taint node %q: %v", n, err)
}
klog.Infof("Ignoring error tainting node %q: %v", n, err)
}
}
}
return nil
}
func (c *RollingUpdateCluster) patchTaint(node *corev1.Node) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
}
node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{
Key: rollingUpdateTaintKey,
Effect: corev1.TaintEffectPreferNoSchedule,
})
newData, err := json.Marshal(node)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
if err != nil {
return err
}
_, err = c.K8sClient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes)
return err
}
func (c *RollingUpdateCluster) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, isBastion bool, sleepAfterTerminate time.Duration) error {
instanceId := u.ID
nodeName := ""
if u.Node != nil {
nodeName = u.Node.Name
}
if isBastion {
// We don't want to validate for bastions - they aren't part of the cluster
} else if c.CloudOnly {
klog.Warning("Not draining cluster nodes as 'cloudonly' flag is set.")
} else {
if u.Node != nil {
klog.Infof("Draining the node: %q.", nodeName)
if err := c.drainNode(u); err != nil {
if c.FailOnDrainError {
return fmt.Errorf("failed to drain node %q: %v", nodeName, err)
}
klog.Infof("Ignoring error draining node %q: %v", nodeName, err)
}
} else {
klog.Warningf("Skipping drain of instance %q, because it is not registered in kubernetes", instanceId)
}
}
// We unregister the node before deleting it; if the replacement comes up with the same name it would otherwise still be cordoned
// (It often seems like GCE tries to re-use names)
if !isBastion && !c.CloudOnly {
if u.Node == nil {
klog.Warningf("no kubernetes Node associated with %s, skipping node deletion", instanceId)
} else {
klog.Infof("deleting node %q from kubernetes", nodeName)
if err := c.deleteNode(u.Node); err != nil {
return fmt.Errorf("error deleting node %q: %v", nodeName, err)
}
}
}
if err := c.deleteInstance(u); err != nil {
klog.Errorf("error deleting instance %q, node %q: %v", instanceId, nodeName, err)
return err
}
// Wait for the minimum interval
klog.Infof("waiting for %v after terminating instance", sleepAfterTerminate)
time.Sleep(sleepAfterTerminate)
return nil
}
func (c *RollingUpdateCluster) maybeValidate(validationTimeout time.Duration, operation string) error {
if c.CloudOnly {
klog.Warningf("Not validating cluster as cloudonly flag is set.")
} else {
klog.Info("Validating the cluster.")
if err := c.validateClusterWithDuration(validationTimeout); err != nil {
if c.FailOnValidate {
klog.Errorf("Cluster did not validate within %s", validationTimeout)
return fmt.Errorf("error validating cluster after %s a node: %v", operation, err)
}
klog.Warningf("Cluster validation failed after %s instance, proceeding since fail-on-validate is set to false: %v", operation, err)
}
}
return nil
}
// validateClusterWithDuration runs validation.ValidateCluster until either we get positive result or the timeout expires
func (c *RollingUpdateCluster) validateClusterWithDuration(duration time.Duration) error {
// Try to validate cluster at least once, this will handle durations that are lower
// than our tick time
if c.tryValidateCluster(duration) {
return nil
}
timeout := time.After(duration)
ticker := time.NewTicker(c.ValidateTickDuration)
defer ticker.Stop()
// Keep trying until we're timed out or got a result or got an error
for {
select {
case <-timeout:
// Got a timeout fail with a timeout error
return fmt.Errorf("cluster did not validate within a duration of %q", duration)
case <-ticker.C:
// Got a tick, validate cluster
if c.tryValidateCluster(duration) {
return nil
}
// ValidateCluster didn't work yet, so let's try again
// this will exit up to the for loop
}
}
}
func (c *RollingUpdateCluster) tryValidateCluster(duration time.Duration) bool {
result, err := c.ClusterValidator.Validate()
if err == nil && len(result.Failures) == 0 && c.ValidateSuccessDuration > 0 {
klog.Infof("Cluster validated; revalidating in %s to make sure it does not flap.", c.ValidateSuccessDuration)
time.Sleep(c.ValidateSuccessDuration)
result, err = c.ClusterValidator.Validate()
}
if err != nil {
klog.Infof("Cluster did not validate, will try again in %q until duration %q expires: %v.", c.ValidateTickDuration, duration, err)
return false
} else if len(result.Failures) > 0 {
messages := []string{}
for _, failure := range result.Failures {
messages = append(messages, failure.Message)
}
klog.Infof("Cluster did not pass validation, will try again in %q until duration %q expires: %s.", c.ValidateTickDuration, duration, strings.Join(messages, ", "))
return false
} else {
klog.Info("Cluster validated.")
return true
}
}
// validateCluster runs our validation methods on the K8s Cluster.
func (c *RollingUpdateCluster) validateCluster() error {
result, err := c.ClusterValidator.Validate()
if err != nil {
return fmt.Errorf("cluster %q did not validate: %v", c.ClusterName, err)
}
if len(result.Failures) > 0 {
messages := []string{}
for _, failure := range result.Failures {
messages = append(messages, failure.Message)
}
return fmt.Errorf("cluster %q did not pass validation: %s", c.ClusterName, strings.Join(messages, ", "))
}
return nil
}
// detachInstance detaches a Cloud Instance
func (c *RollingUpdateCluster) detachInstance(u *cloudinstances.CloudInstanceGroupMember) error {
id := u.ID
nodeName := ""
if u.Node != nil {
nodeName = u.Node.Name
}
if nodeName != "" {
klog.Infof("Detaching instance %q, node %q, in group %q.", id, nodeName, u.CloudInstanceGroup.HumanName)
} else {
klog.Infof("Detaching instance %q, in group %q.", id, u.CloudInstanceGroup.HumanName)
}
if err := c.Cloud.DetachInstance(u); err != nil {
if nodeName != "" {
return fmt.Errorf("error detaching instance %q, node %q: %v", id, nodeName, err)
} else {
return fmt.Errorf("error detaching instance %q: %v", id, err)
}
}
return nil
}
// deleteInstance deletes an Cloud Instance.
func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstanceGroupMember) error {
id := u.ID
nodeName := ""
if u.Node != nil {
nodeName = u.Node.Name
}
if nodeName != "" {
klog.Infof("Stopping instance %q, node %q, in group %q (this may take a while).", id, nodeName, u.CloudInstanceGroup.HumanName)
} else {
klog.Infof("Stopping instance %q, in group %q (this may take a while).", id, u.CloudInstanceGroup.HumanName)
}
if err := c.Cloud.DeleteInstance(u); err != nil {
if nodeName != "" {
return fmt.Errorf("error deleting instance %q, node %q: %v", id, nodeName, err)
} else {
return fmt.Errorf("error deleting instance %q: %v", id, err)
}
}
return nil
}
// drainNode drains a K8s node.
func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstanceGroupMember) error {
if c.K8sClient == nil {
return fmt.Errorf("K8sClient not set")
}
if u.Node == nil {
return fmt.Errorf("node not set")
}
if u.Node.Name == "" {
return fmt.Errorf("node name not set")
}
helper := &drain.Helper{
Client: c.K8sClient,
Force: true,
GracePeriodSeconds: -1,
IgnoreAllDaemonSets: true,
Out: os.Stdout,
ErrOut: os.Stderr,
// We want to proceed even when pods are using local data (emptyDir)
DeleteLocalData: true,
// Other options we might want to set:
// Timeout?
}
if err := drain.RunCordonOrUncordon(helper, u.Node, true); err != nil {
return fmt.Errorf("error cordoning node: %v", err)
}
if err := drain.RunNodeDrain(helper, u.Node.Name); err != nil {
return fmt.Errorf("error draining node: %v", err)
}
if c.PostDrainDelay > 0 {
klog.Infof("Waiting for %s for pods to stabilize after draining.", c.PostDrainDelay)
time.Sleep(c.PostDrainDelay)
}
return nil
}
// deleteNode deletes a node from the k8s API. It does not delete the underlying instance.
func (c *RollingUpdateCluster) deleteNode(node *corev1.Node) error {
var options metav1.DeleteOptions
err := c.K8sClient.CoreV1().Nodes().Delete(node.Name, &options)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error deleting node: %v", err)
}
return nil
}