Merge pull request #8313 from johngmyers/surge

Option to surge during rolling update
This commit is contained in:
Kubernetes Prow Robot 2020-03-04 10:21:47 -08:00 committed by GitHub
commit a5dabf58dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1067 additions and 104 deletions

View File

@ -5,6 +5,7 @@ go_library(
srcs = [
"api.go",
"attach.go",
"ec2shim.go",
"group.go",
"launchconfigurations.go",
"tags.go",
@ -16,6 +17,8 @@ go_library(
"//vendor/github.com/aws/aws-sdk-go/aws/request:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/autoscaling:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/ec2/ec2iface:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -0,0 +1,52 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mockautoscaling
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
)
type ec2Shim struct {
ec2iface.EC2API
mockAutoscaling *MockAutoscaling
}
func (m *MockAutoscaling) GetEC2Shim(e ec2iface.EC2API) ec2iface.EC2API {
return &ec2Shim{
EC2API: e,
mockAutoscaling: m,
}
}
func (e *ec2Shim) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) {
if input.DryRun != nil && *input.DryRun {
return &ec2.TerminateInstancesOutput{}, nil
}
for _, id := range input.InstanceIds {
request := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: id,
ShouldDecrementDesiredCapacity: aws.Bool(false),
}
if _, err := e.mockAutoscaling.TerminateInstanceInAutoScalingGroup(request); err != nil {
return nil, err
}
}
return &ec2.TerminateInstancesOutput{}, nil
}

View File

@ -2937,6 +2937,20 @@ spec:
description: RollingUpdate defines the default rolling-update settings
for instance groups
properties:
maxSurge:
anyOf:
- type: string
- type: integer
description: 'MaxSurge is the maximum number of extra nodes that
can be created during the update. The value can be an absolute
number (for example 5) or a percentage of desired machines (for
example 10%). The absolute number is calculated from a percentage
by rounding up. A value of 0 for both this and MaxUnavailable
disables rolling updates. Has no effect on instance groups with
role "Master". Defaults to 0. Example: when this is set to 30%,
the InstanceGroup can be scaled up immediately when the rolling
update starts, such that the total number of old and new nodes
do not exceed 130% of desired nodes.'
maxUnavailable:
anyOf:
- type: string
@ -2945,12 +2959,13 @@ spec:
can be unavailable during the update. The value can be an absolute
number (for example 5) or a percentage of desired nodes (for example
10%). The absolute number is calculated from a percentage by rounding
down. A value of 0 disables rolling updates. Defaults to 1. Example:
when this is set to 30%, the InstanceGroup can be scaled down
to 70% of desired nodes immediately when the rolling update starts.
Once new nodes are ready, more old nodes can be drained, ensuring
that the total number of nodes available at all times during the
update is at least 70% of desired nodes.'
down. A value of 0 for both this and MaxSurge disables rolling
updates. Defaults to 1 if MaxSurge is 0, otherwise defaults to
0. Example: when this is set to 30%, the InstanceGroup can be
scaled down to 70% of desired nodes immediately when the rolling
update starts. Once new nodes are ready, more old nodes can be
drained, ensuring that the total number of nodes available at
all times during the update is at least 70% of desired nodes.'
type: object
secretStore:
description: SecretStore is the VFS path to where secrets are stored

View File

@ -630,6 +630,20 @@ spec:
rollingUpdate:
description: RollingUpdate defines the rolling-update behavior
properties:
maxSurge:
anyOf:
- type: string
- type: integer
description: 'MaxSurge is the maximum number of extra nodes that
can be created during the update. The value can be an absolute
number (for example 5) or a percentage of desired machines (for
example 10%). The absolute number is calculated from a percentage
by rounding up. A value of 0 for both this and MaxUnavailable
disables rolling updates. Has no effect on instance groups with
role "Master". Defaults to 0. Example: when this is set to 30%,
the InstanceGroup can be scaled up immediately when the rolling
update starts, such that the total number of old and new nodes
do not exceed 130% of desired nodes.'
maxUnavailable:
anyOf:
- type: string
@ -638,12 +652,13 @@ spec:
can be unavailable during the update. The value can be an absolute
number (for example 5) or a percentage of desired nodes (for example
10%). The absolute number is calculated from a percentage by rounding
down. A value of 0 disables rolling updates. Defaults to 1. Example:
when this is set to 30%, the InstanceGroup can be scaled down
to 70% of desired nodes immediately when the rolling update starts.
Once new nodes are ready, more old nodes can be drained, ensuring
that the total number of nodes available at all times during the
update is at least 70% of desired nodes.'
down. A value of 0 for both this and MaxSurge disables rolling
updates. Defaults to 1 if MaxSurge is 0, otherwise defaults to
0. Example: when this is set to 30%, the InstanceGroup can be
scaled down to 70% of desired nodes immediately when the rolling
update starts. Once new nodes are ready, more old nodes can be
drained, ensuring that the total number of nodes available at
all times during the update is at least 70% of desired nodes.'
type: object
rootVolumeDeleteOnTermination:
description: 'RootVolumeDeleteOnTermination configures root volume retention

View File

@ -684,8 +684,8 @@ type RollingUpdate struct {
// The value can be an absolute number (for example 5) or a percentage of desired
// nodes (for example 10%).
// The absolute number is calculated from a percentage by rounding down.
// A value of 0 disables rolling updates.
// Defaults to 1.
// A value of 0 for both this and MaxSurge disables rolling updates.
// Defaults to 1 if MaxSurge is 0, otherwise defaults to 0.
// Example: when this is set to 30%, the InstanceGroup can be scaled
// down to 70% of desired nodes immediately when the rolling update
// starts. Once new nodes are ready, more old nodes can be drained,
@ -693,4 +693,18 @@ type RollingUpdate struct {
// during the update is at least 70% of desired nodes.
// +optional
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
// MaxSurge is the maximum number of extra nodes that can be created
// during the update.
// The value can be an absolute number (for example 5) or a percentage of
// desired machines (for example 10%).
// The absolute number is calculated from a percentage by rounding up.
// A value of 0 for both this and MaxUnavailable disables rolling updates.
// Has no effect on instance groups with role "Master".
// Defaults to 0.
// Example: when this is set to 30%, the InstanceGroup can be scaled
// up immediately when the rolling update starts, such that the total
// number of old and new nodes do not exceed 130% of desired
// nodes.
// +optional
MaxSurge *intstr.IntOrString `json:"maxSurge,omitempty"`
}

View File

@ -566,8 +566,8 @@ type RollingUpdate struct {
// The value can be an absolute number (for example 5) or a percentage of desired
// nodes (for example 10%).
// The absolute number is calculated from a percentage by rounding down.
// A value of 0 disables rolling updates.
// Defaults to 1.
// A value of 0 for both this and MaxSurge disables rolling updates.
// Defaults to 1 if MaxSurge is 0, otherwise defaults to 0.
// Example: when this is set to 30%, the InstanceGroup can be scaled
// down to 70% of desired nodes immediately when the rolling update
// starts. Once new nodes are ready, more old nodes can be drained,
@ -575,4 +575,18 @@ type RollingUpdate struct {
// during the update is at least 70% of desired nodes.
// +optional
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
// MaxSurge is the maximum number of extra nodes that can be created
// during the update.
// The value can be an absolute number (for example 5) or a percentage of
// desired machines (for example 10%).
// The absolute number is calculated from a percentage by rounding up.
// A value of 0 for both this and MaxUnavailable disables rolling updates.
// Has no effect on instance groups with role "Master".
// Defaults to 0.
// Example: when this is set to 30%, the InstanceGroup can be scaled
// up immediately when the rolling update starts, such that the total
// number of old and new nodes do not exceed 130% of desired
// nodes.
// +optional
MaxSurge *intstr.IntOrString `json:"maxSurge,omitempty"`
}

View File

@ -4720,6 +4720,7 @@ func Convert_kops_RBACAuthorizationSpec_To_v1alpha1_RBACAuthorizationSpec(in *ko
func autoConvert_v1alpha1_RollingUpdate_To_kops_RollingUpdate(in *RollingUpdate, out *kops.RollingUpdate, s conversion.Scope) error {
out.MaxUnavailable = in.MaxUnavailable
out.MaxSurge = in.MaxSurge
return nil
}
@ -4730,6 +4731,7 @@ func Convert_v1alpha1_RollingUpdate_To_kops_RollingUpdate(in *RollingUpdate, out
func autoConvert_kops_RollingUpdate_To_v1alpha1_RollingUpdate(in *kops.RollingUpdate, out *RollingUpdate, s conversion.Scope) error {
out.MaxUnavailable = in.MaxUnavailable
out.MaxSurge = in.MaxSurge
return nil
}

View File

@ -3303,6 +3303,11 @@ func (in *RollingUpdate) DeepCopyInto(out *RollingUpdate) {
*out = new(intstr.IntOrString)
**out = **in
}
if in.MaxSurge != nil {
in, out := &in.MaxSurge, &out.MaxSurge
*out = new(intstr.IntOrString)
**out = **in
}
return
}

View File

@ -579,8 +579,8 @@ type RollingUpdate struct {
// The value can be an absolute number (for example 5) or a percentage of desired
// nodes (for example 10%).
// The absolute number is calculated from a percentage by rounding down.
// A value of 0 disables rolling updates.
// Defaults to 1.
// A value of 0 for both this and MaxSurge disables rolling updates.
// Defaults to 1 if MaxSurge is 0, otherwise defaults to 0.
// Example: when this is set to 30%, the InstanceGroup can be scaled
// down to 70% of desired nodes immediately when the rolling update
// starts. Once new nodes are ready, more old nodes can be drained,
@ -588,4 +588,18 @@ type RollingUpdate struct {
// during the update is at least 70% of desired nodes.
// +optional
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
// MaxSurge is the maximum number of extra nodes that can be created
// during the update.
// The value can be an absolute number (for example 5) or a percentage of
// desired machines (for example 10%).
// The absolute number is calculated from a percentage by rounding up.
// A value of 0 for both this and MaxUnavailable disables rolling updates.
// Has no effect on instance groups with role "Master".
// Defaults to 0.
// Example: when this is set to 30%, the InstanceGroup can be scaled
// up immediately when the rolling update starts, such that the total
// number of old and new nodes do not exceed 130% of desired
// nodes.
// +optional
MaxSurge *intstr.IntOrString `json:"maxSurge,omitempty"`
}

View File

@ -4990,6 +4990,7 @@ func Convert_kops_RBACAuthorizationSpec_To_v1alpha2_RBACAuthorizationSpec(in *ko
func autoConvert_v1alpha2_RollingUpdate_To_kops_RollingUpdate(in *RollingUpdate, out *kops.RollingUpdate, s conversion.Scope) error {
out.MaxUnavailable = in.MaxUnavailable
out.MaxSurge = in.MaxSurge
return nil
}
@ -5000,6 +5001,7 @@ func Convert_v1alpha2_RollingUpdate_To_kops_RollingUpdate(in *RollingUpdate, out
func autoConvert_kops_RollingUpdate_To_v1alpha2_RollingUpdate(in *kops.RollingUpdate, out *RollingUpdate, s conversion.Scope) error {
out.MaxUnavailable = in.MaxUnavailable
out.MaxSurge = in.MaxSurge
return nil
}

View File

@ -3374,6 +3374,11 @@ func (in *RollingUpdate) DeepCopyInto(out *RollingUpdate) {
*out = new(intstr.IntOrString)
**out = **in
}
if in.MaxSurge != nil {
in, out := &in.MaxSurge, &out.MaxSurge
*out = new(intstr.IntOrString)
**out = **in
}
return
}

View File

@ -118,7 +118,7 @@ func ValidateInstanceGroup(g *kops.InstanceGroup) field.ErrorList {
allErrs = append(allErrs, validateInstanceProfile(g.Spec.IAM, field.NewPath("spec", "iam"))...)
if g.Spec.RollingUpdate != nil {
allErrs = append(allErrs, validateRollingUpdate(g.Spec.RollingUpdate, field.NewPath("spec", "rollingUpdate"))...)
allErrs = append(allErrs, validateRollingUpdate(g.Spec.RollingUpdate, field.NewPath("spec", "rollingUpdate"), g.Spec.Role == kops.InstanceGroupRoleMaster)...)
}
return allErrs

View File

@ -123,7 +123,7 @@ func validateClusterSpec(spec *kops.ClusterSpec, fieldPath *field.Path) field.Er
}
if spec.RollingUpdate != nil {
allErrs = append(allErrs, validateRollingUpdate(spec.RollingUpdate, fieldPath.Child("rollingUpdate"))...)
allErrs = append(allErrs, validateRollingUpdate(spec.RollingUpdate, fieldPath.Child("rollingUpdate"), false)...)
}
return allErrs
@ -597,7 +597,7 @@ func validateContainerRuntime(runtime *string, fldPath *field.Path) field.ErrorL
return allErrs
}
func validateRollingUpdate(rollingUpdate *kops.RollingUpdate, fldpath *field.Path) field.ErrorList {
func validateRollingUpdate(rollingUpdate *kops.RollingUpdate, fldpath *field.Path, onMasterInstanceGroup bool) field.ErrorList {
allErrs := field.ErrorList{}
if rollingUpdate.MaxUnavailable != nil {
unavailable, err := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxUnavailable, 1, false)
@ -609,6 +609,18 @@ func validateRollingUpdate(rollingUpdate *kops.RollingUpdate, fldpath *field.Pat
allErrs = append(allErrs, field.Invalid(fldpath.Child("maxUnavailable"), rollingUpdate.MaxUnavailable, "Cannot be negative"))
}
}
if rollingUpdate.MaxSurge != nil {
surge, err := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxSurge, 1000, true)
if err != nil {
allErrs = append(allErrs, field.Invalid(fldpath.Child("maxSurge"), rollingUpdate.MaxSurge,
fmt.Sprintf("Unable to parse: %v", err)))
}
if onMasterInstanceGroup && surge != 0 {
allErrs = append(allErrs, field.Forbidden(fldpath.Child("maxSurge"), "Cannot surge instance groups with role \"Master\""))
} else if surge < 0 {
allErrs = append(allErrs, field.Invalid(fldpath.Child("maxSurge"), rollingUpdate.MaxSurge, "Cannot be negative"))
}
}
return allErrs
}

View File

@ -405,6 +405,7 @@ func Test_Validate_Calico(t *testing.T) {
func Test_Validate_RollingUpdate(t *testing.T) {
grid := []struct {
Input kops.RollingUpdate
OnMasterIG bool
ExpectedErrors []string
}{
{
@ -438,9 +439,94 @@ func Test_Validate_RollingUpdate(t *testing.T) {
},
ExpectedErrors: []string{"Invalid value::testField.maxUnavailable"},
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromInt(0)),
},
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromString("0%")),
},
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromInt(1)),
},
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromString("1%")),
},
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromString("nope")),
},
ExpectedErrors: []string{"Invalid value::testField.maxSurge"},
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromInt(-1)),
},
ExpectedErrors: []string{"Invalid value::testField.maxSurge"},
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromString("-1%")),
},
ExpectedErrors: []string{"Invalid value::testField.maxSurge"},
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromInt(0)),
},
OnMasterIG: true,
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromString("0%")),
},
OnMasterIG: true,
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromInt(1)),
},
OnMasterIG: true,
ExpectedErrors: []string{"Forbidden::testField.maxSurge"},
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromString("1%")),
},
OnMasterIG: true,
ExpectedErrors: []string{"Forbidden::testField.maxSurge"},
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromString("nope")),
},
OnMasterIG: true,
ExpectedErrors: []string{"Invalid value::testField.maxSurge"},
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromInt(-1)),
},
OnMasterIG: true,
ExpectedErrors: []string{"Forbidden::testField.maxSurge"},
},
{
Input: kops.RollingUpdate{
MaxSurge: intStr(intstr.FromString("-1%")),
},
OnMasterIG: true,
ExpectedErrors: []string{"Forbidden::testField.maxSurge"},
},
}
for _, g := range grid {
errs := validateRollingUpdate(&g.Input, field.NewPath("testField"))
errs := validateRollingUpdate(&g.Input, field.NewPath("testField"), g.OnMasterIG)
testErrors(t, g.Input, errs, g.ExpectedErrors)
}
}

View File

@ -3588,6 +3588,11 @@ func (in *RollingUpdate) DeepCopyInto(out *RollingUpdate) {
*out = new(intstr.IntOrString)
**out = **in
}
if in.MaxSurge != nil {
in, out := &in.MaxSurge, &out.MaxSurge
*out = new(intstr.IntOrString)
**out = **in
}
return
}

View File

@ -47,6 +47,8 @@ type CloudInstanceGroupMember struct {
Node *v1.Node
// CloudInstanceGroup is the managing CloudInstanceGroup
CloudInstanceGroup *CloudInstanceGroup
// Detached is whether fi.Cloud.DetachInstance has been successfully called on the instance.
Detached bool
}
// NewCloudInstanceGroupMember creates a new CloudInstanceGroupMember
@ -74,6 +76,28 @@ func (c *CloudInstanceGroup) NewCloudInstanceGroupMember(instanceId string, newG
return nil
}
// NewDetachedCloudInstanceGroupMember creates a new CloudInstanceGroupMember for a detached instance
func (c *CloudInstanceGroup) NewDetachedCloudInstanceGroupMember(instanceId string, nodeMap map[string]*v1.Node) error {
if instanceId == "" {
return fmt.Errorf("instance id for cloud instance member cannot be empty")
}
cm := &CloudInstanceGroupMember{
ID: instanceId,
CloudInstanceGroup: c,
Detached: true,
}
node := nodeMap[instanceId]
if node != nil {
cm.Node = node
} else {
klog.V(8).Infof("unable to find node for instance: %s", instanceId)
}
c.NeedUpdate = append(c.NeedUpdate, cm)
return nil
}
// Status returns a human-readable Status indicating whether an update is needed
func (c *CloudInstanceGroup) Status() string {
if len(c.NeedUpdate) == 0 {

View File

@ -45,6 +45,8 @@ go_test(
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/autoscaling:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/ec2/ec2iface:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -101,10 +101,7 @@ func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting b
return stopPrompting, err
}
// TODO: Temporarily increase size of ASG?
// TODO: Remove from ASG first so status is immediately updated?
// RollingUpdate performs a rolling update on a list of ec2 instances.
// RollingUpdate performs a rolling update on a list of instances.
func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpdateCluster, cluster *api.Cluster, isBastion bool, sleepAfterTerminate time.Duration, validationTimeout time.Duration) (err error) {
// we should not get here, but hey I am going to check.
@ -152,17 +149,61 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
settings := resolveSettings(cluster, r.CloudGroup.InstanceGroup, numInstances)
runningDrains := 0
maxConcurrency := settings.MaxUnavailable.IntValue()
maxSurge := settings.MaxSurge.IntValue()
if maxSurge > len(update) {
maxSurge = len(update)
}
maxConcurrency := maxSurge + settings.MaxUnavailable.IntValue()
if maxConcurrency == 0 {
klog.Infof("Rolling updates for InstanceGroup %s are disabled", r.CloudGroup.InstanceGroup.Name)
return nil
}
if r.CloudGroup.InstanceGroup.Spec.Role == api.InstanceGroupRoleMaster && maxSurge != 0 {
// Masters are incapable of surging because they rely on registering themselves through
// the local apiserver. That apiserver depends on the local etcd, which relies on being
// joined to the etcd cluster.
maxSurge = 0
maxConcurrency = settings.MaxUnavailable.IntValue()
if maxConcurrency == 0 {
maxConcurrency = 1
}
}
if rollingUpdateData.Interactive {
if maxSurge > 1 {
maxSurge = 1
}
maxConcurrency = 1
}
update = prioritizeUpdate(update)
if maxSurge > 0 && !rollingUpdateData.CloudOnly {
for numSurge := 1; numSurge <= maxSurge; numSurge++ {
u := update[len(update)-numSurge]
if !u.Detached {
if err := r.detachInstance(u); err != nil {
return err
}
// If noneReady, wait until after one node is detached and its replacement validates
// before detaching more in case the current spec does not result in usable nodes.
if numSurge == maxSurge || noneReady {
// Wait for the minimum interval
klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate)
time.Sleep(sleepAfterTerminate)
if err := r.maybeValidate(rollingUpdateData, validationTimeout, "detaching"); err != nil {
return err
}
noneReady = false
}
}
}
}
terminateChan := make(chan error, maxConcurrency)
for uIdx, u := range update {
@ -183,7 +224,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
}
err = r.maybeValidate(rollingUpdateData, validationTimeout)
err = r.maybeValidate(rollingUpdateData, validationTimeout, "removing")
if err != nil {
return waitForPendingBeforeReturningError(runningDrains, terminateChan, err)
}
@ -229,7 +270,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
}
}
err = r.maybeValidate(rollingUpdateData, validationTimeout)
err = r.maybeValidate(rollingUpdateData, validationTimeout, "removing")
if err != nil {
return err
}
@ -238,6 +279,25 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
return nil
}
func prioritizeUpdate(update []*cloudinstances.CloudInstanceGroupMember) []*cloudinstances.CloudInstanceGroupMember {
// The priorities are, in order:
// attached before detached
// TODO unhealthy before healthy
// NeedUpdate before Ready (preserve original order)
result := make([]*cloudinstances.CloudInstanceGroupMember, 0, len(update))
var detached []*cloudinstances.CloudInstanceGroupMember
for _, u := range update {
if u.Detached {
detached = append(detached, u)
} else {
result = append(result, u)
}
}
result = append(result, detached...)
return result
}
func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan error, err error) error {
for runningDrains > 0 {
<-terminateChan
@ -359,7 +419,7 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
return nil
}
func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpdateCluster, validationTimeout time.Duration) error {
func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpdateCluster, validationTimeout time.Duration, operation string) error {
if rollingUpdateData.CloudOnly {
klog.Warningf("Not validating cluster as cloudonly flag is set.")
@ -370,10 +430,10 @@ func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpd
if rollingUpdateData.FailOnValidate {
klog.Errorf("Cluster did not validate within %s", validationTimeout)
return fmt.Errorf("error validating cluster after removing a node: %v", err)
return fmt.Errorf("error validating cluster after %s a node: %v", operation, err)
}
klog.Warningf("Cluster validation failed after removing instance, proceeding since fail-on-validate is set to false: %v", err)
klog.Warningf("Cluster validation failed after %s instance, proceeding since fail-on-validate is set to false: %v", operation, err)
}
}
return nil
@ -450,6 +510,30 @@ func (r *RollingUpdateInstanceGroup) validateCluster(rollingUpdateData *RollingU
}
// detachInstance detaches a Cloud Instance
func (r *RollingUpdateInstanceGroup) detachInstance(u *cloudinstances.CloudInstanceGroupMember) error {
id := u.ID
nodeName := ""
if u.Node != nil {
nodeName = u.Node.Name
}
if nodeName != "" {
klog.Infof("Detaching instance %q, node %q, in group %q.", id, nodeName, r.CloudGroup.HumanName)
} else {
klog.Infof("Detaching instance %q, in group %q.", id, r.CloudGroup.HumanName)
}
if err := r.Cloud.DetachInstance(u); err != nil {
if nodeName != "" {
return fmt.Errorf("error detaching instance %q, node %q: %v", id, nodeName, err)
} else {
return fmt.Errorf("error detaching instance %q: %v", id, err)
}
}
return nil
}
// DeleteInstance deletes an Cloud Instance.
func (r *RollingUpdateInstanceGroup) DeleteInstance(u *cloudinstances.CloudInstanceGroupMember) error {
id := u.ID

View File

@ -26,6 +26,8 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -49,7 +51,9 @@ func getTestSetup() (*RollingUpdateCluster, *awsup.MockAWSCloud, *kopsapi.Cluste
k8sClient := fake.NewSimpleClientset()
mockcloud := awsup.BuildMockAWSCloud("us-east-1", "abc")
mockcloud.MockAutoscaling = &mockautoscaling.MockAutoscaling{}
mockAutoscaling := &mockautoscaling.MockAutoscaling{}
mockcloud.MockAutoscaling = mockAutoscaling
mockcloud.MockEC2 = mockAutoscaling.GetEC2Shim(mockcloud.MockEC2)
cluster := &kopsapi.Cluster{}
cluster.Name = "test.k8s.local"
@ -109,6 +113,7 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k
fakeClient := k8sClient.(*fake.Clientset)
groups[name] = &cloudinstances.CloudInstanceGroup{
HumanName: name,
InstanceGroup: &kopsapi.InstanceGroup{
ObjectMeta: v1meta.ObjectMeta{
Name: name,
@ -117,6 +122,7 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k
Role: role,
},
},
Raw: &autoscaling.Group{AutoScalingGroupName: aws.String("asg-" + name)},
}
cloud.Autoscaling().CreateAutoScalingGroup(&autoscaling.CreateAutoScalingGroupInput{
AutoScalingGroupName: aws.String(name),
@ -136,8 +142,9 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k
_ = fakeClient.Tracker().Add(node)
}
member := cloudinstances.CloudInstanceGroupMember{
ID: id,
Node: node,
ID: id,
Node: node,
CloudInstanceGroup: groups[name],
}
if i < needUpdate {
groups[name].NeedUpdate = append(groups[name].NeedUpdate, &member)
@ -605,6 +612,52 @@ func TestRollingUpdateTaintAllButOneNeedUpdate(t *testing.T) {
assertGroupInstanceCount(t, cloud, "node-1", 1)
}
func TestRollingUpdateMaxSurgeIgnoredForMaster(t *testing.T) {
c, cloud, cluster := getTestSetup()
two := intstr.FromInt(2)
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
MaxSurge: &two,
}
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroup(groups, c.K8sClient, cloud, "master-1", kopsapi.InstanceGroupRoleMaster, 3, 2)
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")
cordoned := ""
tainted := map[string]bool{}
deleted := map[string]bool{}
for _, action := range c.K8sClient.(*fake.Clientset).Actions() {
switch a := action.(type) {
case testingclient.PatchAction:
if string(a.GetPatch()) == cordonPatch {
assertCordon(t, a)
assert.Equal(t, "", cordoned, "at most one node cordoned at a time")
assert.True(t, tainted[a.GetName()], "node", a.GetName(), "tainted")
cordoned = a.GetName()
} else {
assertTaint(t, a)
assert.Equal(t, "", cordoned, "not tainting while node cordoned")
assert.False(t, tainted[a.GetName()], "node", a.GetName(), "already tainted")
tainted[a.GetName()] = true
}
case testingclient.DeleteAction:
assert.Equal(t, "nodes", a.GetResource().Resource)
assert.Equal(t, cordoned, a.GetName(), "node was cordoned before delete")
assert.False(t, deleted[a.GetName()], "node", a.GetName(), "already deleted")
deleted[a.GetName()] = true
cordoned = ""
case testingclient.ListAction:
// Don't care
default:
t.Errorf("unexpected action %v", a)
}
}
assertGroupInstanceCount(t, cloud, "master-1", 1)
}
func TestRollingUpdateDisabled(t *testing.T) {
c, cloud, cluster := getTestSetup()
@ -644,12 +697,26 @@ func TestRollingUpdateDisabledCloudonly(t *testing.T) {
// The concurrent update tests attempt to induce the following expected update sequence:
//
// (Only for "all need update" tests, to verify the toe-dipping behavior)
// (Only for surging "all need update" test, to verify the toe-dipping behavior)
// Request validate (8) -->
// <-- validated
// Detach instance -->
// Request validate (7) -->
// <-- validated
// Detach instance -->
// (end only for surging "all need update" tests)
// (Only for surging "all but one need update" test)
// Request validate (7) -->
// <-- validated
// Detach instance -->
// Detach instance -->
// (end only for surging "all but one need update" test)
// (Only for non-surging "all need update" tests, to verify the toe-dipping behavior)
// Request validate (7) -->
// <-- validated
// Request terminate 1 node (7) -->
// <-- 1 node terminated, 6 left
// (end only for "all need update" tests)
// (end only for non-surging "all need update" tests)
// Request validate (6) -->
// <-- validated
// Request terminate 2 nodes (6,5) -->
@ -672,19 +739,27 @@ func TestRollingUpdateDisabledCloudonly(t *testing.T) {
// <-- validated
type concurrentTest struct {
autoscalingiface.AutoScalingAPI
ec2iface.EC2API
t *testing.T
mutex sync.Mutex
surge int
terminationRequestsLeft int
previousValidation int
validationChan chan bool
terminationChan chan bool
detached map[string]bool
}
func (c *concurrentTest) Validate() (*validation.ValidationCluster, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
if len(c.detached) < c.surge {
assert.Greater(c.t, c.previousValidation, 7, "previous validation")
c.previousValidation--
return &validation.ValidationCluster{}, nil
}
terminationRequestsLeft := c.terminationRequestsLeft
switch terminationRequestsLeft {
case 7, 6, 0:
@ -727,29 +802,40 @@ func (c *concurrentTest) Validate() (*validation.ValidationCluster, error) {
return &validation.ValidationCluster{}, nil
}
func (c *concurrentTest) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) {
func (c *concurrentTest) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) {
if input.DryRun != nil && *input.DryRun {
return &ec2.TerminateInstancesOutput{}, nil
}
c.mutex.Lock()
defer c.mutex.Unlock()
terminationRequestsLeft := c.terminationRequestsLeft
c.terminationRequestsLeft--
switch terminationRequestsLeft {
case 7, 2, 1:
assert.Equal(c.t, terminationRequestsLeft, c.previousValidation, "previous validation")
case 6, 4:
assert.Equal(c.t, terminationRequestsLeft, c.previousValidation, "previous validation")
c.mutex.Unlock()
select {
case <-c.terminationChan:
case <-time.After(1 * time.Second):
c.t.Error("timed out reading from terminationChan")
for _, id := range input.InstanceIds {
assert.Equal(c.t, c.surge, len(c.detached), "Number of detached instances")
if c.detached[*id] {
assert.LessOrEqual(c.t, c.terminationRequestsLeft, c.surge, "Deleting detached instances last")
}
terminationRequestsLeft := c.terminationRequestsLeft
c.terminationRequestsLeft--
switch terminationRequestsLeft {
case 7, 2, 1:
assert.Equal(c.t, terminationRequestsLeft, c.previousValidation, "previous validation")
case 6, 4:
assert.Equal(c.t, terminationRequestsLeft, c.previousValidation, "previous validation")
c.mutex.Unlock()
select {
case <-c.terminationChan:
case <-time.After(1 * time.Second):
c.t.Error("timed out reading from terminationChan")
}
c.mutex.Lock()
go c.delayThenWakeValidation()
case 5, 3:
assert.Equal(c.t, terminationRequestsLeft+1, c.previousValidation, "previous validation")
}
c.mutex.Lock()
go c.delayThenWakeValidation()
case 5, 3:
assert.Equal(c.t, terminationRequestsLeft+1, c.previousValidation, "previous validation")
}
return c.AutoScalingAPI.TerminateInstanceInAutoScalingGroup(input)
return c.EC2API.TerminateInstances(input)
}
func (c *concurrentTest) delayThenWakeValidation() {
@ -767,28 +853,36 @@ func (c *concurrentTest) AssertComplete() {
assert.Equal(c.t, 0, c.previousValidation, "last validation")
}
func newConcurrentTest(t *testing.T, cloud *awsup.MockAWSCloud, allNeedUpdate bool) *concurrentTest {
func newConcurrentTest(t *testing.T, cloud *awsup.MockAWSCloud, numSurge int, allNeedUpdate bool) *concurrentTest {
test := concurrentTest{
AutoScalingAPI: cloud.MockAutoscaling,
EC2API: cloud.MockEC2,
t: t,
surge: numSurge,
terminationRequestsLeft: 6,
validationChan: make(chan bool),
terminationChan: make(chan bool),
detached: map[string]bool{},
}
if allNeedUpdate {
if numSurge == 0 && allNeedUpdate {
test.terminationRequestsLeft = 7
}
test.previousValidation = test.terminationRequestsLeft + 1
if numSurge == 0 {
test.previousValidation = test.terminationRequestsLeft + 1
} else if allNeedUpdate {
test.previousValidation = 9
} else {
test.previousValidation = 8
}
return &test
}
func TestRollingUpdateMaxUnavailableAllNeedUpdate(t *testing.T) {
c, cloud, cluster := getTestSetup()
concurrentTest := newConcurrentTest(t, cloud, true)
concurrentTest := newConcurrentTest(t, cloud, 0, true)
c.ValidateSuccessDuration = 0
c.ClusterValidator = concurrentTest
cloud.MockAutoscaling = concurrentTest
cloud.MockEC2 = concurrentTest
two := intstr.FromInt(2)
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
@ -808,10 +902,10 @@ func TestRollingUpdateMaxUnavailableAllNeedUpdate(t *testing.T) {
func TestRollingUpdateMaxUnavailableAllButOneNeedUpdate(t *testing.T) {
c, cloud, cluster := getTestSetup()
concurrentTest := newConcurrentTest(t, cloud, false)
concurrentTest := newConcurrentTest(t, cloud, 0, false)
c.ValidateSuccessDuration = 0
c.ClusterValidator = concurrentTest
cloud.MockAutoscaling = concurrentTest
cloud.MockEC2 = concurrentTest
two := intstr.FromInt(2)
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
@ -830,10 +924,10 @@ func TestRollingUpdateMaxUnavailableAllButOneNeedUpdate(t *testing.T) {
func TestRollingUpdateMaxUnavailableAllNeedUpdateMaster(t *testing.T) {
c, cloud, cluster := getTestSetup()
concurrentTest := newConcurrentTest(t, cloud, true)
concurrentTest := newConcurrentTest(t, cloud, 0, true)
c.ValidateSuccessDuration = 0
c.ClusterValidator = concurrentTest
cloud.MockAutoscaling = concurrentTest
cloud.MockEC2 = concurrentTest
two := intstr.FromInt(2)
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
@ -850,6 +944,264 @@ func TestRollingUpdateMaxUnavailableAllNeedUpdateMaster(t *testing.T) {
concurrentTest.AssertComplete()
}
type concurrentTestAutoscaling struct {
autoscalingiface.AutoScalingAPI
ConcurrentTest *concurrentTest
}
func (m *concurrentTestAutoscaling) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) {
m.ConcurrentTest.mutex.Lock()
defer m.ConcurrentTest.mutex.Unlock()
assert.Equal(m.ConcurrentTest.t, "node-1", *input.AutoScalingGroupName)
assert.False(m.ConcurrentTest.t, *input.ShouldDecrementDesiredCapacity)
for _, id := range input.InstanceIds {
assert.Less(m.ConcurrentTest.t, len(m.ConcurrentTest.detached), m.ConcurrentTest.surge, "Number of detached instances")
assert.False(m.ConcurrentTest.t, m.ConcurrentTest.detached[*id], *id+" already detached")
m.ConcurrentTest.detached[*id] = true
}
return &autoscaling.DetachInstancesOutput{}, nil
}
type ec2IgnoreTags struct {
ec2iface.EC2API
}
// CreateTags ignores tagging of instances done by the AWS fi.Cloud implementation of DetachInstance()
func (e *ec2IgnoreTags) CreateTags(*ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) {
return &ec2.CreateTagsOutput{}, nil
}
func TestRollingUpdateMaxSurgeAllNeedUpdate(t *testing.T) {
c, cloud, cluster := getTestSetup()
concurrentTest := newConcurrentTest(t, cloud, 2, true)
c.ValidateSuccessDuration = 0
c.ClusterValidator = concurrentTest
cloud.MockAutoscaling = &concurrentTestAutoscaling{
AutoScalingAPI: cloud.MockAutoscaling,
ConcurrentTest: concurrentTest,
}
cloud.MockEC2 = &ec2IgnoreTags{EC2API: concurrentTest}
two := intstr.FromInt(2)
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
MaxSurge: &two,
}
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 6, 6)
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")
assertGroupInstanceCount(t, cloud, "node-1", 0)
concurrentTest.AssertComplete()
}
func TestRollingUpdateMaxSurgeAllButOneNeedUpdate(t *testing.T) {
c, cloud, cluster := getTestSetup()
concurrentTest := newConcurrentTest(t, cloud, 2, false)
c.ValidateSuccessDuration = 0
c.ClusterValidator = concurrentTest
cloud.MockAutoscaling = &concurrentTestAutoscaling{
AutoScalingAPI: cloud.MockAutoscaling,
ConcurrentTest: concurrentTest,
}
cloud.MockEC2 = &ec2IgnoreTags{EC2API: concurrentTest}
two := intstr.FromInt(2)
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
MaxSurge: &two,
}
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 7, 6)
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")
assertGroupInstanceCount(t, cloud, "node-1", 1)
concurrentTest.AssertComplete()
}
type countDetach struct {
autoscalingiface.AutoScalingAPI
Count int
}
func (c *countDetach) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) {
c.Count += len(input.InstanceIds)
return &autoscaling.DetachInstancesOutput{}, nil
}
func TestRollingUpdateMaxSurgeGreaterThanNeedUpdate(t *testing.T) {
c, cloud, cluster := getTestSetup()
countDetach := &countDetach{AutoScalingAPI: cloud.MockAutoscaling}
cloud.MockAutoscaling = countDetach
cloud.MockEC2 = &ec2IgnoreTags{EC2API: cloud.MockEC2}
ten := intstr.FromInt(10)
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
MaxSurge: &ten,
}
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 3, 2)
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")
assertGroupInstanceCount(t, cloud, "node-1", 1)
assert.Equal(t, 2, countDetach.Count)
}
// Request validate (1) -->
// <-- validated
// Detach instance -->
// Request validate (2) -->
// <-- validated
// Detach instance -->
// Request validate (3) -->
// <-- validated
// Request terminate 3 nodes -->
// <-- 3 nodes terminated, 1 left
// Request validate (4) -->
// <-- validated
// Request terminate 1 node -->
// <-- 1 node terminated, 0 left
// Request validate (5) -->
// <-- validated
type alreadyDetachedTest struct {
ec2iface.EC2API
t *testing.T
mutex sync.Mutex
terminationRequestsLeft int
numValidations int
detached map[string]bool
}
func (t *alreadyDetachedTest) Validate() (*validation.ValidationCluster, error) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.numValidations++
switch t.numValidations {
case 1, 2, 3:
assert.Equal(t.t, t.numValidations, len(t.detached), "numnber of detached instances")
case 4:
t.mutex.Unlock()
time.Sleep(20 * time.Millisecond) // NodeInterval plus some
t.mutex.Lock()
assert.Equal(t.t, 1, t.terminationRequestsLeft, "terminations left")
case 5:
assert.Equal(t.t, 0, t.terminationRequestsLeft, "terminations left")
case 6:
t.t.Error("unexpected sixth call to Validate")
}
return &validation.ValidationCluster{}, nil
}
func (t *alreadyDetachedTest) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) {
if input.DryRun != nil && *input.DryRun {
return &ec2.TerminateInstancesOutput{}, nil
}
t.mutex.Lock()
defer t.mutex.Unlock()
for _, id := range input.InstanceIds {
assert.Equal(t.t, 3, len(t.detached), "Number of detached instances")
assert.GreaterOrEqual(t.t, t.numValidations, 3, "Number of previous validations")
if t.terminationRequestsLeft == 1 {
assert.True(t.t, t.detached[*id], "Last deleted instance %q was detached", *id)
}
t.terminationRequestsLeft--
}
return t.EC2API.TerminateInstances(input)
}
type alreadyDetachedTestAutoscaling struct {
autoscalingiface.AutoScalingAPI
AlreadyDetachedTest *alreadyDetachedTest
}
func (m *alreadyDetachedTestAutoscaling) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) {
m.AlreadyDetachedTest.mutex.Lock()
defer m.AlreadyDetachedTest.mutex.Unlock()
for _, id := range input.InstanceIds {
assert.Less(m.AlreadyDetachedTest.t, len(m.AlreadyDetachedTest.detached), 3, "Number of detached instances")
assert.False(m.AlreadyDetachedTest.t, m.AlreadyDetachedTest.detached[*id], *id+" already detached")
m.AlreadyDetachedTest.detached[*id] = true
}
return &autoscaling.DetachInstancesOutput{}, nil
}
func TestRollingUpdateMaxSurgeAllNeedUpdateOneAlreadyDetached(t *testing.T) {
c, cloud, cluster := getTestSetup()
alreadyDetachedTest := &alreadyDetachedTest{
EC2API: cloud.MockEC2,
t: t,
terminationRequestsLeft: 4,
detached: map[string]bool{},
}
c.ValidateSuccessDuration = 0
c.ClusterValidator = alreadyDetachedTest
cloud.MockAutoscaling = &alreadyDetachedTestAutoscaling{
AutoScalingAPI: cloud.MockAutoscaling,
AlreadyDetachedTest: alreadyDetachedTest,
}
cloud.MockEC2 = &ec2IgnoreTags{EC2API: alreadyDetachedTest}
three := intstr.FromInt(3)
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
MaxSurge: &three,
}
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 4, 4)
alreadyDetachedTest.detached[groups["node-1"].NeedUpdate[3].ID] = true
groups["node-1"].NeedUpdate[3].Detached = true
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")
assertGroupInstanceCount(t, cloud, "node-1", 0)
assert.Equal(t, 5, alreadyDetachedTest.numValidations, "Number of validations")
}
func TestRollingUpdateMaxSurgeAllNeedUpdateMaxAlreadyDetached(t *testing.T) {
c, cloud, cluster := getTestSetup()
// Should behave the same as TestRollingUpdateMaxUnavailableAllNeedUpdate
concurrentTest := newConcurrentTest(t, cloud, 0, true)
c.ValidateSuccessDuration = 0
c.ClusterValidator = concurrentTest
cloud.MockEC2 = concurrentTest
two := intstr.FromInt(2)
cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{
MaxSurge: &two,
}
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 7, 7)
groups["node-1"].NeedUpdate[1].Detached = true
groups["node-1"].NeedUpdate[3].Detached = true
// TODO verify those are the last two instances terminated
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")
assertGroupInstanceCount(t, cloud, "node-1", 0)
concurrentTest.AssertComplete()
}
func assertCordon(t *testing.T, action testingclient.PatchAction) {
assert.Equal(t, "nodes", action.GetResource().Resource)
assert.Equal(t, cordonPatch, string(action.GetPatch()))

View File

@ -31,19 +31,32 @@ func resolveSettings(cluster *kops.Cluster, group *kops.InstanceGroup, numInstan
if rollingUpdate.MaxUnavailable == nil {
rollingUpdate.MaxUnavailable = def.MaxUnavailable
}
if rollingUpdate.MaxSurge == nil {
rollingUpdate.MaxSurge = def.MaxSurge
}
}
if rollingUpdate.MaxUnavailable == nil || rollingUpdate.MaxUnavailable.IntVal < 0 {
one := intstr.FromInt(1)
rollingUpdate.MaxUnavailable = &one
if rollingUpdate.MaxSurge == nil {
zero := intstr.FromInt(0)
rollingUpdate.MaxSurge = &zero
}
if rollingUpdate.MaxSurge.Type == intstr.String {
surge, _ := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxSurge, numInstances, true)
surgeInt := intstr.FromInt(surge)
rollingUpdate.MaxSurge = &surgeInt
}
maxUnavailableDefault := intstr.FromInt(0)
if rollingUpdate.MaxSurge.Type == intstr.Int && rollingUpdate.MaxSurge.IntVal == 0 {
maxUnavailableDefault = intstr.FromInt(1)
}
if rollingUpdate.MaxUnavailable == nil {
rollingUpdate.MaxUnavailable = &maxUnavailableDefault
}
if rollingUpdate.MaxUnavailable.Type == intstr.String {
unavailable, err := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxUnavailable, numInstances, false)
if err != nil {
// If unparseable use the default value
unavailable = 1
}
unavailable, _ := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxUnavailable, numInstances, false)
if unavailable <= 0 {
// While we round down, percentages should resolve to a minimum of 1
unavailable = 1

View File

@ -37,6 +37,11 @@ func TestSettings(t *testing.T) {
defaultValue: intstr.FromInt(1),
nonDefaultValue: intstr.FromInt(2),
},
{
name: "MaxSurge",
defaultValue: intstr.FromInt(0),
nonDefaultValue: intstr.FromInt(2),
},
} {
t.Run(tc.name, func(t *testing.T) {
defaultCluster := &kops.RollingUpdate{}
@ -133,21 +138,6 @@ func TestMaxUnavailable(t *testing.T) {
value: "100%",
expected: 10,
},
{
numInstances: 5,
value: "fnord",
expected: 1,
},
{
numInstances: 5,
value: "-3",
expected: 1,
},
{
numInstances: 5,
value: "-3%",
expected: 1,
},
} {
t.Run(fmt.Sprintf("%s %d", tc.value, tc.numInstances), func(t *testing.T) {
value := intstr.Parse(tc.value)
@ -165,3 +155,52 @@ func TestMaxUnavailable(t *testing.T) {
})
}
}
func TestMaxSurge(t *testing.T) {
for _, tc := range []struct {
numInstances int
value string
expected int32
}{
{
numInstances: 1,
value: "0",
expected: 0,
},
{
numInstances: 1,
value: "0%",
expected: 0,
},
{
numInstances: 10,
value: "31%",
expected: 4,
},
{
numInstances: 10,
value: "100%",
expected: 10,
},
} {
t.Run(fmt.Sprintf("%s %d", tc.value, tc.numInstances), func(t *testing.T) {
value := intstr.Parse(tc.value)
rollingUpdate := kops.RollingUpdate{
MaxSurge: &value,
}
instanceGroup := kops.InstanceGroup{
Spec: kops.InstanceGroupSpec{
RollingUpdate: &rollingUpdate,
},
}
resolved := resolveSettings(&kops.Cluster{}, &instanceGroup, tc.numInstances)
assert.Equal(t, intstr.Int, resolved.MaxSurge.Type)
assert.Equal(t, tc.expected, resolved.MaxSurge.IntVal)
if tc.expected == 0 {
assert.Equal(t, int32(1), resolved.MaxUnavailable.IntVal, "MaxUnavailable default")
} else {
assert.Equal(t, int32(0), resolved.MaxUnavailable.IntVal, "MaxUnavailable default")
}
})
}
}

View File

@ -100,6 +100,12 @@ func (c *Cloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember) error
return fmt.Errorf("digital ocean cloud provider does not support deleting cloud instances at this time")
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *Cloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
klog.V(8).Info("digitalocean cloud provider DetachInstance not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support surging")
}
// ProviderID returns the kops api identifier for DigitalOcean cloud provider
func (c *Cloud) ProviderID() kops.CloudProviderID {
return kops.CloudProviderDO

View File

@ -193,6 +193,11 @@ func DeleteInstance(cloud Cloud, instance *cloudinstances.CloudInstanceGroupMemb
return fmt.Errorf("spotinst: unexpected instance group type, got: %T", group.Raw)
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func DetachInstance(cloud Cloud, instance *cloudinstances.CloudInstanceGroupMember) error {
return fmt.Errorf("spotinst does not support surging")
}
// GetCloudGroups returns a list of InstanceGroups as CloudInstanceGroup objects.
func GetCloudGroups(cloud Cloud, cluster *kops.Cluster, instanceGroups []*kops.InstanceGroup,
warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {

View File

@ -261,13 +261,20 @@ func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances
var allMembers []*cloudinstances.CloudInstanceGroupMember
allMembers = append(allMembers, cloudGroup.Ready...)
allMembers = append(allMembers, cloudGroup.NeedUpdate...)
if len(allMembers) < cloudGroup.MinSize {
numNodes := 0
for _, m := range allMembers {
if !m.Detached {
numNodes++
}
}
if numNodes < cloudGroup.MinSize {
v.addError(&ValidationError{
Kind: "InstanceGroup",
Name: cloudGroup.InstanceGroup.Name,
Message: fmt.Sprintf("InstanceGroup %q did not have enough nodes %d vs %d",
cloudGroup.InstanceGroup.Name,
len(allMembers),
numNodes,
cloudGroup.MinSize),
})
}

View File

@ -164,6 +164,59 @@ func Test_ValidateNodesNotEnough(t *testing.T) {
}
}
func Test_ValidateDetachedNodesDontCount(t *testing.T) {
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
groups["node-1"] = &cloudinstances.CloudInstanceGroup{
InstanceGroup: &kopsapi.InstanceGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Spec: kopsapi.InstanceGroupSpec{
Role: kopsapi.InstanceGroupRoleNode,
},
},
MinSize: 2,
Ready: []*cloudinstances.CloudInstanceGroupMember{
{
ID: "i-00001",
Node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node-1a"},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{Type: "Ready", Status: v1.ConditionTrue},
},
},
},
},
},
NeedUpdate: []*cloudinstances.CloudInstanceGroupMember{
{
ID: "i-00002",
Node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node-1b"},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{Type: "Ready", Status: v1.ConditionTrue},
},
},
},
Detached: true,
},
},
}
v, err := testValidate(t, groups, nil)
require.NoError(t, err)
if !assert.Len(t, v.Failures, 1) ||
!assert.Equal(t, &ValidationError{
Kind: "InstanceGroup",
Name: "node-1",
Message: "InstanceGroup \"node-1\" did not have enough nodes 1 vs 2",
}, v.Failures[0]) {
printDebug(t, v)
}
}
func Test_ValidateNodeNotReady(t *testing.T) {
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
groups["node-1"] = &cloudinstances.CloudInstanceGroup{

View File

@ -28,20 +28,24 @@ type Cloud interface {
DNS() (dnsprovider.Interface, error)
// FindVPCInfo looks up the specified VPC by id, returning info if found, otherwise (nil, nil)
// FindVPCInfo looks up the specified VPC by id, returning info if found, otherwise (nil, nil).
FindVPCInfo(id string) (*VPCInfo, error)
// DeleteInstance deletes a cloud instance
// DeleteInstance deletes a cloud instance.
DeleteInstance(instance *cloudinstances.CloudInstanceGroupMember) error
// DeleteGroup deletes the cloud resources that make up a CloudInstanceGroup, including the instances
// DeleteGroup deletes the cloud resources that make up a CloudInstanceGroup, including the instances.
DeleteGroup(group *cloudinstances.CloudInstanceGroup) error
// GetCloudGroups returns a map of cloud instances that back a kops cluster
// DetachInstance causes a cloud instance to no longer be counted against the group's size limits.
DetachInstance(instance *cloudinstances.CloudInstanceGroupMember) error
// GetCloudGroups returns a map of cloud instances that back a kops cluster.
// Detached instances must be returned in the NeedUpdate slice.
GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error)
// Region returns the cloud region bound to the cloud instance
// If the region concept does not apply, returns ""
// Region returns the cloud region bound to the cloud instance.
// If the region concept does not apply, returns "".
Region() string
}

View File

@ -180,6 +180,10 @@ func (c *aliCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstanceG
return nil
}
func (c *aliCloudImplementation) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
return errors.New("aliCloud cloud provider does not support surging")
}
func (c *aliCloudImplementation) FindVPCInfo(id string) (*fi.VPCInfo, error) {
request := &ecs.DescribeVpcsArgs{
RegionId: common.Region(c.Region()),

View File

@ -85,6 +85,8 @@ const TagNameKopsRole = "kubernetes.io/kops/role"
// TagNameClusterOwnershipPrefix is the AWS tag used for ownership
const TagNameClusterOwnershipPrefix = "kubernetes.io/cluster/"
const tagNameDetachedInstance = "kops.k8s.io/detached-from-asg"
const (
WellKnownAccountAmazonLinux2 = "137112412989"
WellKnownAccountCentOS = "679593333241"
@ -358,6 +360,23 @@ func deleteGroup(c AWSCloud, g *cloudinstances.CloudInstanceGroup) error {
launchTemplate = aws.StringValue(asg.LaunchTemplate.LaunchTemplateName)
}
// Delete detached instances
{
detached, err := findDetachedInstances(c, asg)
if err != nil {
return fmt.Errorf("error searching for detached instances for autoscaling group %q: %v", name, err)
}
if len(detached) > 0 {
klog.V(2).Infof("Deleting detached instances for autoscaling group %q", name)
req := &ec2.TerminateInstancesInput{
InstanceIds: detached,
}
if _, err := c.EC2().TerminateInstances(req); err != nil {
return fmt.Errorf("error deleting detached instances for autoscaling group %q: %v", name, err)
}
}
}
// Delete ASG
{
klog.V(2).Infof("Deleting autoscaling group %q", name)
@ -418,12 +437,11 @@ func deleteInstance(c AWSCloud, i *cloudinstances.CloudInstanceGroupMember) erro
return fmt.Errorf("id was not set on CloudInstanceGroupMember: %v", i)
}
request := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(id),
ShouldDecrementDesiredCapacity: aws.Bool(false),
request := &ec2.TerminateInstancesInput{
InstanceIds: []*string{aws.String(id)},
}
if _, err := c.Autoscaling().TerminateInstanceInAutoScalingGroup(request); err != nil {
if _, err := c.EC2().TerminateInstances(request); err != nil {
return fmt.Errorf("error deleting instance %q: %v", id, err)
}
@ -432,7 +450,42 @@ func deleteInstance(c AWSCloud, i *cloudinstances.CloudInstanceGroupMember) erro
return nil
}
// TODO not used yet, as this requires a major refactor of rolling-update code, slowly but surely
// DetachInstance causes an aws instance to no longer be counted against the ASG's size limits.
func (c *awsCloudImplementation) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
if c.spotinst != nil {
return spotinst.DetachInstance(c.spotinst, i)
}
return detachInstance(c, i)
}
func detachInstance(c AWSCloud, i *cloudinstances.CloudInstanceGroupMember) error {
id := i.ID
if id == "" {
return fmt.Errorf("id was not set on CloudInstanceGroupMember: %v", i)
}
asg := i.CloudInstanceGroup.Raw.(*autoscaling.Group)
if err := c.CreateTags(id, map[string]string{tagNameDetachedInstance: *asg.AutoScalingGroupName}); err != nil {
return fmt.Errorf("error tagging instance %q: %v", id, err)
}
// TODO this also deregisters the instance from any ELB attached to the ASG. Do we care?
input := &autoscaling.DetachInstancesInput{
AutoScalingGroupName: aws.String(i.CloudInstanceGroup.HumanName),
InstanceIds: []*string{aws.String(id)},
ShouldDecrementDesiredCapacity: aws.Bool(false),
}
if _, err := c.Autoscaling().DetachInstances(input); err != nil {
return fmt.Errorf("error detaching instance %q: %v", id, err)
}
klog.V(8).Infof("detached aws ec2 instance %q", id)
return nil
}
// GetCloudGroups returns a groups of instances that back a kops instance groups
func (c *awsCloudImplementation) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
@ -467,7 +520,7 @@ func getCloudGroups(c AWSCloud, cluster *kops.Cluster, instancegroups []*kops.In
continue
}
groups[instancegroup.ObjectMeta.Name], err = awsBuildCloudInstanceGroup(c, instancegroup, asg, nodeMap)
groups[instancegroup.ObjectMeta.Name], err = awsBuildCloudInstanceGroup(c, cluster, instancegroup, asg, nodeMap)
if err != nil {
return nil, fmt.Errorf("error getting cloud instance group %q: %v", instancegroup.ObjectMeta.Name, err)
}
@ -648,12 +701,14 @@ func findInstanceLaunchConfiguration(i *autoscaling.Instance) string {
return ""
}
func awsBuildCloudInstanceGroup(c AWSCloud, ig *kops.InstanceGroup, g *autoscaling.Group, nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error) {
func awsBuildCloudInstanceGroup(c AWSCloud, cluster *kops.Cluster, ig *kops.InstanceGroup, g *autoscaling.Group, nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error) {
newConfigName, err := findAutoscalingGroupLaunchConfiguration(c, g)
if err != nil {
return nil, err
}
instanceSeen := map[string]bool{}
cg := &cloudinstances.CloudInstanceGroup{
HumanName: aws.StringValue(g.AutoScalingGroupName),
InstanceGroup: ig,
@ -668,6 +723,7 @@ func awsBuildCloudInstanceGroup(c AWSCloud, ig *kops.InstanceGroup, g *autoscali
klog.Warningf("ignoring instance with no instance id: %s in autoscaling group: %s", id, cg.HumanName)
continue
}
instanceSeen[id] = true
// @step: check if the instance is terminating
if aws.StringValue(i.LifecycleState) == autoscaling.LifecycleStateTerminating {
klog.Warningf("ignoring instance as it is terminating: %s in autoscaling group: %s", id, cg.HumanName)
@ -680,9 +736,44 @@ func awsBuildCloudInstanceGroup(c AWSCloud, ig *kops.InstanceGroup, g *autoscali
}
}
detached, err := findDetachedInstances(c, g)
if err != nil {
return nil, fmt.Errorf("error searching for detached instances: %v", err)
}
for _, id := range detached {
if id != nil && *id != "" && !instanceSeen[*id] {
if err := cg.NewDetachedCloudInstanceGroupMember(*id, nodeMap); err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
}
instanceSeen[*id] = true
}
}
return cg, nil
}
func findDetachedInstances(c AWSCloud, g *autoscaling.Group) ([]*string, error) {
req := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
NewEC2Filter("tag:"+tagNameDetachedInstance, aws.StringValue(g.AutoScalingGroupName)),
NewEC2Filter("instance-state-name", "pending", "running", "stopping", "stopped"),
},
}
result, err := c.EC2().DescribeInstances(req)
if err != nil {
return nil, err
}
var detached []*string
for _, r := range result.Reservations {
for _, i := range r.Instances {
detached = append(detached, i.InstanceId)
}
}
return detached, nil
}
func (c *awsCloudImplementation) Tags() map[string]string {
// Defensive copy
tags := make(map[string]string)

View File

@ -90,6 +90,10 @@ func (c *MockAWSCloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember
return deleteInstance(c, i)
}
func (c *MockAWSCloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
return detachInstance(c, i)
}
func (c *MockAWSCloud) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) {
return getCloudGroups(c, cluster, instancegroups, warnUnmatched, nodes)
}

View File

@ -68,6 +68,13 @@ func (c *Cloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
return fmt.Errorf("baremetal cloud provider does not support deleting cloud groups at this time")
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
// Baremetal may not support this.
func (c *Cloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
klog.V(8).Infof("baremetal cloud provider DetachInstance not implemented")
return fmt.Errorf("baremetal cloud provider does not support surging")
}
//DeleteInstance is not implemented yet, is func needs to delete a DO instance.
//Baremetal may not support this.
func (c *Cloud) DeleteInstance(instance *cloudinstances.CloudInstanceGroupMember) error {

View File

@ -61,6 +61,18 @@ func (c *mockGCECloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember
return recreateCloudInstanceGroupMember(c, i)
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *gceCloudImplementation) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
klog.V(8).Info("gce cloud provider DetachInstance not implemented yet")
return fmt.Errorf("gce cloud provider does not support surging")
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *mockGCECloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
klog.V(8).Info("gce cloud provider DetachInstance not implemented yet")
return fmt.Errorf("gce cloud provider does not support surging")
}
// recreateCloudInstanceGroupMember recreates the specified instances, managed by an InstanceGroupManager
func recreateCloudInstanceGroupMember(c GCECloud, i *cloudinstances.CloudInstanceGroupMember) error {
mig := i.CloudInstanceGroup.Raw.(*compute.InstanceGroupManager)

View File

@ -110,6 +110,12 @@ func (c *openstackCloud) DeleteInstanceWithID(instanceID string) error {
return servers.Delete(c.novaClient, instanceID).ExtractErr()
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *openstackCloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
klog.V(8).Info("openstack cloud provider DetachInstance not implemented yet")
return fmt.Errorf("openstack cloud provider does not support surging")
}
func (c *openstackCloud) GetInstance(id string) (*servers.Server, error) {
var server *servers.Server

View File

@ -131,6 +131,12 @@ func (c *VSphereCloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember
return fmt.Errorf("vSphere cloud provider does not support deleting cloud instances at this time.")
}
// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *VSphereCloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
klog.V(8).Info("vSphere cloud provider DetachInstance not implemented yet")
return fmt.Errorf("vSphere cloud provider does not support surging")
}
// DNS returns dnsprovider interface for this vSphere cloud.
func (c *VSphereCloud) DNS() (dnsprovider.Interface, error) {
var provider dnsprovider.Interface