From 640f5f5b74f11f12094d25cc7b07f0947ca72c39 Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Tue, 7 Jan 2020 23:13:25 -0800 Subject: [PATCH 01/13] Terminate AWS instances through EC2 instead of Autoscaling --- cloudmock/aws/mockautoscaling/BUILD.bazel | 3 ++ cloudmock/aws/mockautoscaling/ec2shim.go | 52 ++++++++++++++++++++ pkg/instancegroups/BUILD.bazel | 3 +- pkg/instancegroups/rollingupdate_test.go | 59 +++++++++++++---------- upup/pkg/fi/cloudup/awsup/aws_cloud.go | 7 ++- 5 files changed, 94 insertions(+), 30 deletions(-) create mode 100644 cloudmock/aws/mockautoscaling/ec2shim.go diff --git a/cloudmock/aws/mockautoscaling/BUILD.bazel b/cloudmock/aws/mockautoscaling/BUILD.bazel index 8d04c0dbf5..4623cd9d2d 100644 --- a/cloudmock/aws/mockautoscaling/BUILD.bazel +++ b/cloudmock/aws/mockautoscaling/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "api.go", "attach.go", + "ec2shim.go", "group.go", "launchconfigurations.go", "tags.go", @@ -16,6 +17,8 @@ go_library( "//vendor/github.com/aws/aws-sdk-go/aws/request:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/autoscaling:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/service/ec2/ec2iface:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/cloudmock/aws/mockautoscaling/ec2shim.go b/cloudmock/aws/mockautoscaling/ec2shim.go new file mode 100644 index 0000000000..97bdd09e95 --- /dev/null +++ b/cloudmock/aws/mockautoscaling/ec2shim.go @@ -0,0 +1,52 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mockautoscaling + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" +) + +type ec2Shim struct { + ec2iface.EC2API + mockAutoscaling *MockAutoscaling +} + +func (m *MockAutoscaling) GetEC2Shim(e ec2iface.EC2API) ec2iface.EC2API { + return &ec2Shim{ + EC2API: e, + mockAutoscaling: m, + } +} + +func (e *ec2Shim) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) { + if input.DryRun != nil && *input.DryRun { + return &ec2.TerminateInstancesOutput{}, nil + } + for _, id := range input.InstanceIds { + request := &autoscaling.TerminateInstanceInAutoScalingGroupInput{ + InstanceId: id, + ShouldDecrementDesiredCapacity: aws.Bool(false), + } + if _, err := e.mockAutoscaling.TerminateInstanceInAutoScalingGroup(request); err != nil { + return nil, err + } + } + return &ec2.TerminateInstancesOutput{}, nil +} diff --git a/pkg/instancegroups/BUILD.bazel b/pkg/instancegroups/BUILD.bazel index 520c15586c..51d2bd6009 100644 --- a/pkg/instancegroups/BUILD.bazel +++ b/pkg/instancegroups/BUILD.bazel @@ -44,7 +44,8 @@ go_test( "//upup/pkg/fi/cloudup/awsup:go_default_library", "//vendor/github.com/aws/aws-sdk-go/aws:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/autoscaling:go_default_library", - "//vendor/github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/service/ec2/ec2iface:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/instancegroups/rollingupdate_test.go b/pkg/instancegroups/rollingupdate_test.go index 6863b6b14d..14c51cb6ee 100644 --- a/pkg/instancegroups/rollingupdate_test.go +++ b/pkg/instancegroups/rollingupdate_test.go @@ -25,7 +25,8 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/autoscaling" - "github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" v1meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -49,7 +50,9 @@ func getTestSetup() (*RollingUpdateCluster, *awsup.MockAWSCloud, *kopsapi.Cluste k8sClient := fake.NewSimpleClientset() mockcloud := awsup.BuildMockAWSCloud("us-east-1", "abc") - mockcloud.MockAutoscaling = &mockautoscaling.MockAutoscaling{} + mockAutoscaling := &mockautoscaling.MockAutoscaling{} + mockcloud.MockAutoscaling = mockAutoscaling + mockcloud.MockEC2 = mockAutoscaling.GetEC2Shim(mockcloud.MockEC2) cluster := &kopsapi.Cluster{} cluster.Name = "test.k8s.local" @@ -672,7 +675,7 @@ func TestRollingUpdateDisabledCloudonly(t *testing.T) { // <-- validated type concurrentTest struct { - autoscalingiface.AutoScalingAPI + ec2iface.EC2API t *testing.T mutex sync.Mutex terminationRequestsLeft int @@ -727,29 +730,35 @@ func (c *concurrentTest) Validate() (*validation.ValidationCluster, error) { return &validation.ValidationCluster{}, nil } -func (c *concurrentTest) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) { +func (c *concurrentTest) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) { + if input.DryRun != nil && *input.DryRun { + return &ec2.TerminateInstancesOutput{}, nil + } + c.mutex.Lock() defer c.mutex.Unlock() - terminationRequestsLeft := c.terminationRequestsLeft - c.terminationRequestsLeft-- - switch terminationRequestsLeft { - case 7, 2, 1: - assert.Equal(c.t, terminationRequestsLeft, c.previousValidation, "previous validation") - case 6, 4: - assert.Equal(c.t, terminationRequestsLeft, c.previousValidation, "previous validation") - c.mutex.Unlock() - select { - case <-c.terminationChan: - case <-time.After(1 * time.Second): - c.t.Error("timed out reading from terminationChan") + for range input.InstanceIds { + terminationRequestsLeft := c.terminationRequestsLeft + c.terminationRequestsLeft-- + switch terminationRequestsLeft { + case 7, 2, 1: + assert.Equal(c.t, terminationRequestsLeft, c.previousValidation, "previous validation") + case 6, 4: + assert.Equal(c.t, terminationRequestsLeft, c.previousValidation, "previous validation") + c.mutex.Unlock() + select { + case <-c.terminationChan: + case <-time.After(1 * time.Second): + c.t.Error("timed out reading from terminationChan") + } + c.mutex.Lock() + go c.delayThenWakeValidation() + case 5, 3: + assert.Equal(c.t, terminationRequestsLeft+1, c.previousValidation, "previous validation") } - c.mutex.Lock() - go c.delayThenWakeValidation() - case 5, 3: - assert.Equal(c.t, terminationRequestsLeft+1, c.previousValidation, "previous validation") } - return c.AutoScalingAPI.TerminateInstanceInAutoScalingGroup(input) + return c.EC2API.TerminateInstances(input) } func (c *concurrentTest) delayThenWakeValidation() { @@ -769,7 +778,7 @@ func (c *concurrentTest) AssertComplete() { func newConcurrentTest(t *testing.T, cloud *awsup.MockAWSCloud, allNeedUpdate bool) *concurrentTest { test := concurrentTest{ - AutoScalingAPI: cloud.MockAutoscaling, + EC2API: cloud.MockEC2, t: t, terminationRequestsLeft: 6, validationChan: make(chan bool), @@ -788,7 +797,7 @@ func TestRollingUpdateMaxUnavailableAllNeedUpdate(t *testing.T) { concurrentTest := newConcurrentTest(t, cloud, true) c.ValidateSuccessDuration = 0 c.ClusterValidator = concurrentTest - cloud.MockAutoscaling = concurrentTest + cloud.MockEC2 = concurrentTest two := intstr.FromInt(2) cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ @@ -811,7 +820,7 @@ func TestRollingUpdateMaxUnavailableAllButOneNeedUpdate(t *testing.T) { concurrentTest := newConcurrentTest(t, cloud, false) c.ValidateSuccessDuration = 0 c.ClusterValidator = concurrentTest - cloud.MockAutoscaling = concurrentTest + cloud.MockEC2 = concurrentTest two := intstr.FromInt(2) cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ @@ -833,7 +842,7 @@ func TestRollingUpdateMaxUnavailableAllNeedUpdateMaster(t *testing.T) { concurrentTest := newConcurrentTest(t, cloud, true) c.ValidateSuccessDuration = 0 c.ClusterValidator = concurrentTest - cloud.MockAutoscaling = concurrentTest + cloud.MockEC2 = concurrentTest two := intstr.FromInt(2) cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ diff --git a/upup/pkg/fi/cloudup/awsup/aws_cloud.go b/upup/pkg/fi/cloudup/awsup/aws_cloud.go index 7aa22516e2..59b14458b4 100644 --- a/upup/pkg/fi/cloudup/awsup/aws_cloud.go +++ b/upup/pkg/fi/cloudup/awsup/aws_cloud.go @@ -414,12 +414,11 @@ func deleteInstance(c AWSCloud, i *cloudinstances.CloudInstanceGroupMember) erro return fmt.Errorf("id was not set on CloudInstanceGroupMember: %v", i) } - request := &autoscaling.TerminateInstanceInAutoScalingGroupInput{ - InstanceId: aws.String(id), - ShouldDecrementDesiredCapacity: aws.Bool(false), + request := &ec2.TerminateInstancesInput{ + InstanceIds: []*string{aws.String(id)}, } - if _, err := c.Autoscaling().TerminateInstanceInAutoScalingGroup(request); err != nil { + if _, err := c.EC2().TerminateInstances(request); err != nil { return fmt.Errorf("error deleting instance %q: %v", id, err) } From cc5b6f4b8f1418938ba85352a46f53e58aef7ae9 Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 5 Jan 2020 16:02:44 -0800 Subject: [PATCH 02/13] Add fi.Cloud.DetachInstance() --- pkg/cloudinstances/cloud_instance_group.go | 24 +++++ pkg/resources/digitalocean/cloud.go | 6 ++ pkg/resources/spotinst/resources.go | 5 + upup/pkg/fi/cloud.go | 16 ++-- upup/pkg/fi/cloudup/aliup/ali_cloud.go | 4 + upup/pkg/fi/cloudup/awsup/aws_cloud.go | 98 +++++++++++++++++++- upup/pkg/fi/cloudup/awsup/mock_aws_cloud.go | 4 + upup/pkg/fi/cloudup/baremetal/cloud.go | 7 ++ upup/pkg/fi/cloudup/gce/instancegroups.go | 12 +++ upup/pkg/fi/cloudup/openstack/instance.go | 6 ++ upup/pkg/fi/cloudup/vsphere/vsphere_cloud.go | 6 ++ 11 files changed, 179 insertions(+), 9 deletions(-) diff --git a/pkg/cloudinstances/cloud_instance_group.go b/pkg/cloudinstances/cloud_instance_group.go index 6099556099..6e3b14cd86 100644 --- a/pkg/cloudinstances/cloud_instance_group.go +++ b/pkg/cloudinstances/cloud_instance_group.go @@ -47,6 +47,8 @@ type CloudInstanceGroupMember struct { Node *v1.Node // CloudInstanceGroup is the managing CloudInstanceGroup CloudInstanceGroup *CloudInstanceGroup + // Detached is whether fi.Cloud.DetachInstance has been successfully called on the instance. + Detached bool } // NewCloudInstanceGroupMember creates a new CloudInstanceGroupMember @@ -74,6 +76,28 @@ func (c *CloudInstanceGroup) NewCloudInstanceGroupMember(instanceId string, newG return nil } +// NewDetachedCloudInstanceGroupMember creates a new CloudInstanceGroupMember for a detached instance +func (c *CloudInstanceGroup) NewDetachedCloudInstanceGroupMember(instanceId string, nodeMap map[string]*v1.Node) error { + if instanceId == "" { + return fmt.Errorf("instance id for cloud instance member cannot be empty") + } + cm := &CloudInstanceGroupMember{ + ID: instanceId, + CloudInstanceGroup: c, + Detached: true, + } + node := nodeMap[instanceId] + if node != nil { + cm.Node = node + } else { + klog.V(8).Infof("unable to find node for instance: %s", instanceId) + } + + c.NeedUpdate = append(c.NeedUpdate, cm) + + return nil +} + // Status returns a human-readable Status indicating whether an update is needed func (c *CloudInstanceGroup) Status() string { if len(c.NeedUpdate) == 0 { diff --git a/pkg/resources/digitalocean/cloud.go b/pkg/resources/digitalocean/cloud.go index 88e927f786..1de93c2b39 100644 --- a/pkg/resources/digitalocean/cloud.go +++ b/pkg/resources/digitalocean/cloud.go @@ -99,6 +99,12 @@ func (c *Cloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember) error return fmt.Errorf("digital ocean cloud provider does not support deleting cloud instances at this time") } +// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits. +func (c *Cloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error { + klog.V(8).Info("digitalocean cloud provider DetachInstance not implemented yet") + return fmt.Errorf("digital ocean cloud provider does not support surging") +} + // ProviderID returns the kops api identifier for DigitalOcean cloud provider func (c *Cloud) ProviderID() kops.CloudProviderID { return kops.CloudProviderDO diff --git a/pkg/resources/spotinst/resources.go b/pkg/resources/spotinst/resources.go index 72edf4c12e..f28da51b00 100644 --- a/pkg/resources/spotinst/resources.go +++ b/pkg/resources/spotinst/resources.go @@ -193,6 +193,11 @@ func DeleteInstance(cloud Cloud, instance *cloudinstances.CloudInstanceGroupMemb return fmt.Errorf("spotinst: unexpected instance group type, got: %T", group.Raw) } +// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits. +func DetachInstance(cloud Cloud, instance *cloudinstances.CloudInstanceGroupMember) error { + return fmt.Errorf("spotinst does not support surging") +} + // GetCloudGroups returns a list of InstanceGroups as CloudInstanceGroup objects. func GetCloudGroups(cloud Cloud, cluster *kops.Cluster, instanceGroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) { diff --git a/upup/pkg/fi/cloud.go b/upup/pkg/fi/cloud.go index 5f3adeb68c..38278e6ddc 100644 --- a/upup/pkg/fi/cloud.go +++ b/upup/pkg/fi/cloud.go @@ -28,20 +28,24 @@ type Cloud interface { DNS() (dnsprovider.Interface, error) - // FindVPCInfo looks up the specified VPC by id, returning info if found, otherwise (nil, nil) + // FindVPCInfo looks up the specified VPC by id, returning info if found, otherwise (nil, nil). FindVPCInfo(id string) (*VPCInfo, error) - // DeleteInstance deletes a cloud instance + // DeleteInstance deletes a cloud instance. DeleteInstance(instance *cloudinstances.CloudInstanceGroupMember) error - // DeleteGroup deletes the cloud resources that make up a CloudInstanceGroup, including the instances + // DeleteGroup deletes the cloud resources that make up a CloudInstanceGroup, including the instances. DeleteGroup(group *cloudinstances.CloudInstanceGroup) error - // GetCloudGroups returns a map of cloud instances that back a kops cluster + // DetachInstance causes a cloud instance to no longer be counted against the group's size limits. + DetachInstance(instance *cloudinstances.CloudInstanceGroupMember) error + + // GetCloudGroups returns a map of cloud instances that back a kops cluster. + // Detached instances must be returned in the NeedUpdate slice. GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) - // Region returns the cloud region bound to the cloud instance - // If the region concept does not apply, returns "" + // Region returns the cloud region bound to the cloud instance. + // If the region concept does not apply, returns "". Region() string } diff --git a/upup/pkg/fi/cloudup/aliup/ali_cloud.go b/upup/pkg/fi/cloudup/aliup/ali_cloud.go index 5706038b43..7f847f9340 100644 --- a/upup/pkg/fi/cloudup/aliup/ali_cloud.go +++ b/upup/pkg/fi/cloudup/aliup/ali_cloud.go @@ -145,6 +145,10 @@ func (c *aliCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstanceG return errors.New("DeleteInstance not implemented on aliCloud") } +func (c *aliCloudImplementation) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error { + return errors.New("aliCloud cloud provider does not support surging") +} + func (c *aliCloudImplementation) FindVPCInfo(id string) (*fi.VPCInfo, error) { request := &ecs.DescribeVpcsArgs{ RegionId: common.Region(c.Region()), diff --git a/upup/pkg/fi/cloudup/awsup/aws_cloud.go b/upup/pkg/fi/cloudup/awsup/aws_cloud.go index 59b14458b4..011a9ca500 100644 --- a/upup/pkg/fi/cloudup/awsup/aws_cloud.go +++ b/upup/pkg/fi/cloudup/awsup/aws_cloud.go @@ -85,6 +85,8 @@ const TagNameKopsRole = "kubernetes.io/kops/role" // TagNameClusterOwnershipPrefix is the AWS tag used for ownership const TagNameClusterOwnershipPrefix = "kubernetes.io/cluster/" +const tagNameDetachedInstance = "kops.k8s.io/detached-from-asg" + const ( WellKnownAccountKopeio = "383156758163" WellKnownAccountRedhat = "309956199498" @@ -354,6 +356,23 @@ func deleteGroup(c AWSCloud, g *cloudinstances.CloudInstanceGroup) error { launchTemplate = aws.StringValue(asg.LaunchTemplate.LaunchTemplateName) } + // Delete detached instances + { + detached, err := findDetachedInstances(c, asg) + if err != nil { + return fmt.Errorf("error searching for detached instances for autoscaling group %q: %v", name, err) + } + if len(detached) > 0 { + klog.V(2).Infof("Deleting detached instances for autoscaling group %q", name) + req := &ec2.TerminateInstancesInput{ + InstanceIds: detached, + } + if _, err := c.EC2().TerminateInstances(req); err != nil { + return fmt.Errorf("error deleting detached instances for autoscaling group %q: %v", name, err) + } + } + } + // Delete ASG { klog.V(2).Infof("Deleting autoscaling group %q", name) @@ -427,7 +446,42 @@ func deleteInstance(c AWSCloud, i *cloudinstances.CloudInstanceGroupMember) erro return nil } -// TODO not used yet, as this requires a major refactor of rolling-update code, slowly but surely +// DetachInstance causes an aws instance to no longer be counted against the ASG's size limits. +func (c *awsCloudImplementation) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error { + if c.spotinst != nil { + return spotinst.DetachInstance(c.spotinst, i) + } + + return detachInstance(c, i) +} + +func detachInstance(c AWSCloud, i *cloudinstances.CloudInstanceGroupMember) error { + id := i.ID + if id == "" { + return fmt.Errorf("id was not set on CloudInstanceGroupMember: %v", i) + } + + asg := i.CloudInstanceGroup.Raw.(*autoscaling.Group) + if err := c.CreateTags(id, map[string]string{tagNameDetachedInstance: *asg.AutoScalingGroupName}); err != nil { + return fmt.Errorf("error tagging instance %q: %v", id, err) + } + + // TODO this also deregisters the instance from any ELB attached to the ASG. Do we care? + + input := &autoscaling.DetachInstancesInput{ + AutoScalingGroupName: aws.String(i.CloudInstanceGroup.HumanName), + InstanceIds: []*string{aws.String(id)}, + ShouldDecrementDesiredCapacity: aws.Bool(false), + } + + if _, err := c.Autoscaling().DetachInstances(input); err != nil { + return fmt.Errorf("error detaching instance %q: %v", id, err) + } + + klog.V(8).Infof("detached aws ec2 instance %q", id) + + return nil +} // GetCloudGroups returns a groups of instances that back a kops instance groups func (c *awsCloudImplementation) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) { @@ -462,7 +516,7 @@ func getCloudGroups(c AWSCloud, cluster *kops.Cluster, instancegroups []*kops.In continue } - groups[instancegroup.ObjectMeta.Name], err = awsBuildCloudInstanceGroup(c, instancegroup, asg, nodeMap) + groups[instancegroup.ObjectMeta.Name], err = awsBuildCloudInstanceGroup(c, cluster, instancegroup, asg, nodeMap) if err != nil { return nil, fmt.Errorf("error getting cloud instance group %q: %v", instancegroup.ObjectMeta.Name, err) } @@ -643,12 +697,14 @@ func findInstanceLaunchConfiguration(i *autoscaling.Instance) string { return "" } -func awsBuildCloudInstanceGroup(c AWSCloud, ig *kops.InstanceGroup, g *autoscaling.Group, nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error) { +func awsBuildCloudInstanceGroup(c AWSCloud, cluster *kops.Cluster, ig *kops.InstanceGroup, g *autoscaling.Group, nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error) { newConfigName, err := findAutoscalingGroupLaunchConfiguration(c, g) if err != nil { return nil, err } + instanceSeen := map[string]bool{} + cg := &cloudinstances.CloudInstanceGroup{ HumanName: aws.StringValue(g.AutoScalingGroupName), InstanceGroup: ig, @@ -663,6 +719,7 @@ func awsBuildCloudInstanceGroup(c AWSCloud, ig *kops.InstanceGroup, g *autoscali klog.Warningf("ignoring instance with no instance id: %s in autoscaling group: %s", id, cg.HumanName) continue } + instanceSeen[id] = true // @step: check if the instance is terminating if aws.StringValue(i.LifecycleState) == autoscaling.LifecycleStateTerminating { klog.Warningf("ignoring instance as it is terminating: %s in autoscaling group: %s", id, cg.HumanName) @@ -675,9 +732,44 @@ func awsBuildCloudInstanceGroup(c AWSCloud, ig *kops.InstanceGroup, g *autoscali } } + detached, err := findDetachedInstances(c, g) + if err != nil { + return nil, fmt.Errorf("error searching for detached instances: %v", err) + } + for _, id := range detached { + if id != nil && *id != "" && !instanceSeen[*id] { + if err := cg.NewDetachedCloudInstanceGroupMember(*id, nodeMap); err != nil { + return nil, fmt.Errorf("error creating cloud instance group member: %v", err) + } + instanceSeen[*id] = true + } + } + return cg, nil } +func findDetachedInstances(c AWSCloud, g *autoscaling.Group) ([]*string, error) { + req := &ec2.DescribeInstancesInput{ + Filters: []*ec2.Filter{ + NewEC2Filter("tag:"+tagNameDetachedInstance, aws.StringValue(g.AutoScalingGroupName)), + NewEC2Filter("instance-state-name", "pending", "running", "stopping", "stopped"), + }, + } + + result, err := c.EC2().DescribeInstances(req) + if err != nil { + return nil, err + } + + var detached []*string + for _, r := range result.Reservations { + for _, i := range r.Instances { + detached = append(detached, i.InstanceId) + } + } + return detached, nil +} + func (c *awsCloudImplementation) Tags() map[string]string { // Defensive copy tags := make(map[string]string) diff --git a/upup/pkg/fi/cloudup/awsup/mock_aws_cloud.go b/upup/pkg/fi/cloudup/awsup/mock_aws_cloud.go index 535eb3b2f3..eee779b84b 100644 --- a/upup/pkg/fi/cloudup/awsup/mock_aws_cloud.go +++ b/upup/pkg/fi/cloudup/awsup/mock_aws_cloud.go @@ -90,6 +90,10 @@ func (c *MockAWSCloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember return deleteInstance(c, i) } +func (c *MockAWSCloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error { + return detachInstance(c, i) +} + func (c *MockAWSCloud) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) { return getCloudGroups(c, cluster, instancegroups, warnUnmatched, nodes) } diff --git a/upup/pkg/fi/cloudup/baremetal/cloud.go b/upup/pkg/fi/cloudup/baremetal/cloud.go index 16e9bd4bd4..820816c63e 100644 --- a/upup/pkg/fi/cloudup/baremetal/cloud.go +++ b/upup/pkg/fi/cloudup/baremetal/cloud.go @@ -68,6 +68,13 @@ func (c *Cloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error { return fmt.Errorf("baremetal cloud provider does not support deleting cloud groups at this time") } +// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits. +// Baremetal may not support this. +func (c *Cloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error { + klog.V(8).Infof("baremetal cloud provider DetachInstance not implemented") + return fmt.Errorf("baremetal cloud provider does not support surging") +} + //DeleteInstance is not implemented yet, is func needs to delete a DO instance. //Baremetal may not support this. func (c *Cloud) DeleteInstance(instance *cloudinstances.CloudInstanceGroupMember) error { diff --git a/upup/pkg/fi/cloudup/gce/instancegroups.go b/upup/pkg/fi/cloudup/gce/instancegroups.go index 0b7348d584..17daadcc7e 100644 --- a/upup/pkg/fi/cloudup/gce/instancegroups.go +++ b/upup/pkg/fi/cloudup/gce/instancegroups.go @@ -61,6 +61,18 @@ func (c *mockGCECloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember return recreateCloudInstanceGroupMember(c, i) } +// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits. +func (c *gceCloudImplementation) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error { + klog.V(8).Info("gce cloud provider DetachInstance not implemented yet") + return fmt.Errorf("gce cloud provider does not support surging") +} + +// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits. +func (c *mockGCECloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error { + klog.V(8).Info("gce cloud provider DetachInstance not implemented yet") + return fmt.Errorf("gce cloud provider does not support surging") +} + // recreateCloudInstanceGroupMember recreates the specified instances, managed by an InstanceGroupManager func recreateCloudInstanceGroupMember(c GCECloud, i *cloudinstances.CloudInstanceGroupMember) error { mig := i.CloudInstanceGroup.Raw.(*compute.InstanceGroupManager) diff --git a/upup/pkg/fi/cloudup/openstack/instance.go b/upup/pkg/fi/cloudup/openstack/instance.go index 8171c76faf..d849909609 100644 --- a/upup/pkg/fi/cloudup/openstack/instance.go +++ b/upup/pkg/fi/cloudup/openstack/instance.go @@ -110,6 +110,12 @@ func (c *openstackCloud) DeleteInstanceWithID(instanceID string) error { return servers.Delete(c.novaClient, instanceID).ExtractErr() } +// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits. +func (c *openstackCloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error { + klog.V(8).Info("openstack cloud provider DetachInstance not implemented yet") + return fmt.Errorf("openstack cloud provider does not support surging") +} + func (c *openstackCloud) GetInstance(id string) (*servers.Server, error) { var server *servers.Server diff --git a/upup/pkg/fi/cloudup/vsphere/vsphere_cloud.go b/upup/pkg/fi/cloudup/vsphere/vsphere_cloud.go index 45c551d298..f22b10e7e5 100644 --- a/upup/pkg/fi/cloudup/vsphere/vsphere_cloud.go +++ b/upup/pkg/fi/cloudup/vsphere/vsphere_cloud.go @@ -131,6 +131,12 @@ func (c *VSphereCloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember return fmt.Errorf("vSphere cloud provider does not support deleting cloud instances at this time.") } +// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits. +func (c *VSphereCloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error { + klog.V(8).Info("vSphere cloud provider DetachInstance not implemented yet") + return fmt.Errorf("vSphere cloud provider does not support surging") +} + // DNS returns dnsprovider interface for this vSphere cloud. func (c *VSphereCloud) DNS() (dnsprovider.Interface, error) { var provider dnsprovider.Interface From be12d88cc37a6a27f0edefde2a9f13c42e5b67df Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 5 Jan 2020 17:39:46 -0800 Subject: [PATCH 03/13] Detached instances don't count against instancegroup minimums --- pkg/validation/validate_cluster.go | 11 ++++- pkg/validation/validate_cluster_test.go | 53 +++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/pkg/validation/validate_cluster.go b/pkg/validation/validate_cluster.go index 0f2c957dbb..4becb3730c 100644 --- a/pkg/validation/validate_cluster.go +++ b/pkg/validation/validate_cluster.go @@ -235,13 +235,20 @@ func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances var allMembers []*cloudinstances.CloudInstanceGroupMember allMembers = append(allMembers, cloudGroup.Ready...) allMembers = append(allMembers, cloudGroup.NeedUpdate...) - if len(allMembers) < cloudGroup.MinSize { + + numNodes := 0 + for _, m := range allMembers { + if !m.Detached { + numNodes++ + } + } + if numNodes < cloudGroup.MinSize { v.addError(&ValidationError{ Kind: "InstanceGroup", Name: cloudGroup.InstanceGroup.Name, Message: fmt.Sprintf("InstanceGroup %q did not have enough nodes %d vs %d", cloudGroup.InstanceGroup.Name, - len(allMembers), + numNodes, cloudGroup.MinSize), }) } diff --git a/pkg/validation/validate_cluster_test.go b/pkg/validation/validate_cluster_test.go index a61fe8a725..92f0e1e234 100644 --- a/pkg/validation/validate_cluster_test.go +++ b/pkg/validation/validate_cluster_test.go @@ -164,6 +164,59 @@ func Test_ValidateNodesNotEnough(t *testing.T) { } } +func Test_ValidateDetachedNodesDontCount(t *testing.T) { + groups := make(map[string]*cloudinstances.CloudInstanceGroup) + groups["node-1"] = &cloudinstances.CloudInstanceGroup{ + InstanceGroup: &kopsapi.InstanceGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Spec: kopsapi.InstanceGroupSpec{ + Role: kopsapi.InstanceGroupRoleNode, + }, + }, + MinSize: 2, + Ready: []*cloudinstances.CloudInstanceGroupMember{ + { + ID: "i-00001", + Node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-1a"}, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + {Type: "Ready", Status: v1.ConditionTrue}, + }, + }, + }, + }, + }, + NeedUpdate: []*cloudinstances.CloudInstanceGroupMember{ + { + ID: "i-00002", + Node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-1b"}, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + {Type: "Ready", Status: v1.ConditionTrue}, + }, + }, + }, + Detached: true, + }, + }, + } + + v, err := testValidate(t, groups, nil) + require.NoError(t, err) + if !assert.Len(t, v.Failures, 1) || + !assert.Equal(t, &ValidationError{ + Kind: "InstanceGroup", + Name: "node-1", + Message: "InstanceGroup \"node-1\" did not have enough nodes 1 vs 2", + }, v.Failures[0]) { + printDebug(t, v) + } +} + func Test_ValidateNodeNotReady(t *testing.T) { groups := make(map[string]*cloudinstances.CloudInstanceGroup) groups["node-1"] = &cloudinstances.CloudInstanceGroup{ From 10f06c5c5130af6b0bdf55d74b63c387bc3ee21e Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 5 Jan 2020 17:51:01 -0800 Subject: [PATCH 04/13] Add MaxSurge setting to cluster and instancegroup --- pkg/apis/kops/cluster.go | 17 +++++++++++-- pkg/apis/kops/v1alpha1/cluster.go | 17 +++++++++++-- pkg/apis/kops/v1alpha2/cluster.go | 17 +++++++++++-- pkg/apis/kops/validation/validation.go | 10 ++++++++ pkg/apis/kops/validation/validation_test.go | 28 +++++++++++++++++++++ 5 files changed, 83 insertions(+), 6 deletions(-) diff --git a/pkg/apis/kops/cluster.go b/pkg/apis/kops/cluster.go index 5d621f7aab..72ff8f413d 100644 --- a/pkg/apis/kops/cluster.go +++ b/pkg/apis/kops/cluster.go @@ -663,8 +663,8 @@ type RollingUpdate struct { // The value can be an absolute number (for example 5) or a percentage of desired // nodes (for example 10%). // The absolute number is calculated from a percentage by rounding down. - // A value of 0 disables rolling updates. - // Defaults to 1. + // A value of 0 for both this and MaxSurge disables rolling updates. + // Defaults to 1 if MaxSurge is 0, otherwise defaults to 0. // Example: when this is set to 30%, the InstanceGroup can be scaled // down to 70% of desired nodes immediately when the rolling update // starts. Once new nodes are ready, more old nodes can be drained, @@ -672,4 +672,17 @@ type RollingUpdate struct { // during the update is at least 70% of desired nodes. // +optional MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` + // MaxSurge is the maximum number of extra nodes that can be created + // during the update. + // The value can be an absolute number (for example 5) or a percentage of + // desired machines (for example 10%). + // The absolute number is calculated from a percentage by rounding up. + // A value of 0 for both this and MaxUnavailable disables rolling updates. + // Defaults to 0. + // Example: when this is set to 30%, the InstanceGroup can be scaled + // up immediately when the rolling update starts, such that the total + // number of old and new nodes do not exceed 130% of desired + // nodes. + // +optional + MaxSurge *intstr.IntOrString `json:"maxSurge,omitempty"` } diff --git a/pkg/apis/kops/v1alpha1/cluster.go b/pkg/apis/kops/v1alpha1/cluster.go index 7eaa91860a..f8e8cc8780 100644 --- a/pkg/apis/kops/v1alpha1/cluster.go +++ b/pkg/apis/kops/v1alpha1/cluster.go @@ -547,8 +547,8 @@ type RollingUpdate struct { // The value can be an absolute number (for example 5) or a percentage of desired // nodes (for example 10%). // The absolute number is calculated from a percentage by rounding down. - // A value of 0 disables rolling updates. - // Defaults to 1. + // A value of 0 for both this and MaxSurge disables rolling updates. + // Defaults to 1 if MaxSurge is 0, otherwise defaults to 0. // Example: when this is set to 30%, the InstanceGroup can be scaled // down to 70% of desired nodes immediately when the rolling update // starts. Once new nodes are ready, more old nodes can be drained, @@ -556,4 +556,17 @@ type RollingUpdate struct { // during the update is at least 70% of desired nodes. // +optional MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` + // MaxSurge is the maximum number of extra nodes that can be created + // during the update. + // The value can be an absolute number (for example 5) or a percentage of + // desired machines (for example 10%). + // The absolute number is calculated from a percentage by rounding up. + // A value of 0 for both this and MaxUnavailable disables rolling updates. + // Defaults to 0. + // Example: when this is set to 30%, the InstanceGroup can be scaled + // up immediately when the rolling update starts, such that the total + // number of old and new nodes do not exceed 130% of desired + // nodes. + // +optional + MaxSurge *intstr.IntOrString `json:"maxSurge,omitempty"` } diff --git a/pkg/apis/kops/v1alpha2/cluster.go b/pkg/apis/kops/v1alpha2/cluster.go index 241a89e88a..608f88702f 100644 --- a/pkg/apis/kops/v1alpha2/cluster.go +++ b/pkg/apis/kops/v1alpha2/cluster.go @@ -560,8 +560,8 @@ type RollingUpdate struct { // The value can be an absolute number (for example 5) or a percentage of desired // nodes (for example 10%). // The absolute number is calculated from a percentage by rounding down. - // A value of 0 disables rolling updates. - // Defaults to 1. + // A value of 0 for both this and MaxSurge disables rolling updates. + // Defaults to 1 if MaxSurge is 0, otherwise defaults to 0. // Example: when this is set to 30%, the InstanceGroup can be scaled // down to 70% of desired nodes immediately when the rolling update // starts. Once new nodes are ready, more old nodes can be drained, @@ -569,4 +569,17 @@ type RollingUpdate struct { // during the update is at least 70% of desired nodes. // +optional MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` + // MaxSurge is the maximum number of extra nodes that can be created + // during the update. + // The value can be an absolute number (for example 5) or a percentage of + // desired machines (for example 10%). + // The absolute number is calculated from a percentage by rounding up. + // A value of 0 for both this and MaxUnavailable disables rolling updates. + // Defaults to 0. + // Example: when this is set to 30%, the InstanceGroup can be scaled + // up immediately when the rolling update starts, such that the total + // number of old and new nodes do not exceed 130% of desired + // nodes. + // +optional + MaxSurge *intstr.IntOrString `json:"maxSurge,omitempty"` } diff --git a/pkg/apis/kops/validation/validation.go b/pkg/apis/kops/validation/validation.go index 353b06f9f3..c9b8b68318 100644 --- a/pkg/apis/kops/validation/validation.go +++ b/pkg/apis/kops/validation/validation.go @@ -458,6 +458,16 @@ func validateRollingUpdate(rollingUpdate *kops.RollingUpdate, fldpath *field.Pat allErrs = append(allErrs, field.Invalid(fldpath.Child("MaxUnavailable"), rollingUpdate.MaxUnavailable, "Cannot be negative")) } } + if rollingUpdate.MaxSurge != nil { + surge, err := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxSurge, 1, false) + if err != nil { + allErrs = append(allErrs, field.Invalid(fldpath.Child("MaxSurge"), rollingUpdate.MaxSurge, + fmt.Sprintf("Unable to parse: %v", err))) + } + if surge < 0 { + allErrs = append(allErrs, field.Invalid(fldpath.Child("MaxSurge"), rollingUpdate.MaxSurge, "Cannot be negative")) + } + } return allErrs } diff --git a/pkg/apis/kops/validation/validation_test.go b/pkg/apis/kops/validation/validation_test.go index 3585f2a57b..bd99e54d17 100644 --- a/pkg/apis/kops/validation/validation_test.go +++ b/pkg/apis/kops/validation/validation_test.go @@ -438,6 +438,34 @@ func Test_Validate_RollingUpdate(t *testing.T) { }, ExpectedErrors: []string{"Invalid value::TestField.MaxUnavailable"}, }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromInt(0)), + }, + }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromString("0%")), + }, + }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromString("nope")), + }, + ExpectedErrors: []string{"Invalid value::TestField.MaxSurge"}, + }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromInt(-1)), + }, + ExpectedErrors: []string{"Invalid value::TestField.MaxSurge"}, + }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromString("-1%")), + }, + ExpectedErrors: []string{"Invalid value::TestField.MaxSurge"}, + }, } for _, g := range grid { errs := validateRollingUpdate(&g.Input, field.NewPath("TestField")) From c95a43c02638eb341a29707152d3300046f6341a Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 5 Jan 2020 17:54:24 -0800 Subject: [PATCH 05/13] make apimachinery --- pkg/apis/kops/v1alpha1/zz_generated.conversion.go | 2 ++ pkg/apis/kops/v1alpha1/zz_generated.deepcopy.go | 5 +++++ pkg/apis/kops/v1alpha2/zz_generated.conversion.go | 2 ++ pkg/apis/kops/v1alpha2/zz_generated.deepcopy.go | 5 +++++ pkg/apis/kops/zz_generated.deepcopy.go | 5 +++++ 5 files changed, 19 insertions(+) diff --git a/pkg/apis/kops/v1alpha1/zz_generated.conversion.go b/pkg/apis/kops/v1alpha1/zz_generated.conversion.go index 84443e0fa4..e0d7d1daa0 100644 --- a/pkg/apis/kops/v1alpha1/zz_generated.conversion.go +++ b/pkg/apis/kops/v1alpha1/zz_generated.conversion.go @@ -4654,6 +4654,7 @@ func Convert_kops_RBACAuthorizationSpec_To_v1alpha1_RBACAuthorizationSpec(in *ko func autoConvert_v1alpha1_RollingUpdate_To_kops_RollingUpdate(in *RollingUpdate, out *kops.RollingUpdate, s conversion.Scope) error { out.MaxUnavailable = in.MaxUnavailable + out.MaxSurge = in.MaxSurge return nil } @@ -4664,6 +4665,7 @@ func Convert_v1alpha1_RollingUpdate_To_kops_RollingUpdate(in *RollingUpdate, out func autoConvert_kops_RollingUpdate_To_v1alpha1_RollingUpdate(in *kops.RollingUpdate, out *RollingUpdate, s conversion.Scope) error { out.MaxUnavailable = in.MaxUnavailable + out.MaxSurge = in.MaxSurge return nil } diff --git a/pkg/apis/kops/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/kops/v1alpha1/zz_generated.deepcopy.go index 81b0fec567..25e94c36f5 100644 --- a/pkg/apis/kops/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kops/v1alpha1/zz_generated.deepcopy.go @@ -3258,6 +3258,11 @@ func (in *RollingUpdate) DeepCopyInto(out *RollingUpdate) { *out = new(intstr.IntOrString) **out = **in } + if in.MaxSurge != nil { + in, out := &in.MaxSurge, &out.MaxSurge + *out = new(intstr.IntOrString) + **out = **in + } return } diff --git a/pkg/apis/kops/v1alpha2/zz_generated.conversion.go b/pkg/apis/kops/v1alpha2/zz_generated.conversion.go index c73e7b1593..f8e6d5e5ce 100644 --- a/pkg/apis/kops/v1alpha2/zz_generated.conversion.go +++ b/pkg/apis/kops/v1alpha2/zz_generated.conversion.go @@ -4924,6 +4924,7 @@ func Convert_kops_RBACAuthorizationSpec_To_v1alpha2_RBACAuthorizationSpec(in *ko func autoConvert_v1alpha2_RollingUpdate_To_kops_RollingUpdate(in *RollingUpdate, out *kops.RollingUpdate, s conversion.Scope) error { out.MaxUnavailable = in.MaxUnavailable + out.MaxSurge = in.MaxSurge return nil } @@ -4934,6 +4935,7 @@ func Convert_v1alpha2_RollingUpdate_To_kops_RollingUpdate(in *RollingUpdate, out func autoConvert_kops_RollingUpdate_To_v1alpha2_RollingUpdate(in *kops.RollingUpdate, out *RollingUpdate, s conversion.Scope) error { out.MaxUnavailable = in.MaxUnavailable + out.MaxSurge = in.MaxSurge return nil } diff --git a/pkg/apis/kops/v1alpha2/zz_generated.deepcopy.go b/pkg/apis/kops/v1alpha2/zz_generated.deepcopy.go index e3d8a53012..885709ba61 100644 --- a/pkg/apis/kops/v1alpha2/zz_generated.deepcopy.go +++ b/pkg/apis/kops/v1alpha2/zz_generated.deepcopy.go @@ -3329,6 +3329,11 @@ func (in *RollingUpdate) DeepCopyInto(out *RollingUpdate) { *out = new(intstr.IntOrString) **out = **in } + if in.MaxSurge != nil { + in, out := &in.MaxSurge, &out.MaxSurge + *out = new(intstr.IntOrString) + **out = **in + } return } diff --git a/pkg/apis/kops/zz_generated.deepcopy.go b/pkg/apis/kops/zz_generated.deepcopy.go index f137545805..2d3428b551 100644 --- a/pkg/apis/kops/zz_generated.deepcopy.go +++ b/pkg/apis/kops/zz_generated.deepcopy.go @@ -3543,6 +3543,11 @@ func (in *RollingUpdate) DeepCopyInto(out *RollingUpdate) { *out = new(intstr.IntOrString) **out = **in } + if in.MaxSurge != nil { + in, out := &in.MaxSurge, &out.MaxSurge + *out = new(intstr.IntOrString) + **out = **in + } return } From b8e665018c3a1c4b46209500d363c56f3c8b407e Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 5 Jan 2020 17:54:45 -0800 Subject: [PATCH 06/13] make crds --- k8s/crds/kops.k8s.io_clusters.yaml | 26 ++++++++++++++++++------ k8s/crds/kops.k8s.io_instancegroups.yaml | 26 ++++++++++++++++++------ 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/k8s/crds/kops.k8s.io_clusters.yaml b/k8s/crds/kops.k8s.io_clusters.yaml index c466f8336e..74ba98e358 100644 --- a/k8s/crds/kops.k8s.io_clusters.yaml +++ b/k8s/crds/kops.k8s.io_clusters.yaml @@ -2889,6 +2889,19 @@ spec: description: RollingUpdate defines the default rolling-update settings for instance groups properties: + maxSurge: + anyOf: + - type: string + - type: integer + description: 'MaxSurge is the maximum number of extra nodes that + can be created during the update. The value can be an absolute + number (for example 5) or a percentage of desired machines (for + example 10%). The absolute number is calculated from a percentage + by rounding up. A value of 0 for both this and MaxUnavailable + disables rolling updates. Defaults to 0. Example: when this is + set to 30%, the InstanceGroup can be scaled up immediately when + the rolling update starts, such that the total number of old and + new nodes do not exceed 130% of desired nodes.' maxUnavailable: anyOf: - type: string @@ -2897,12 +2910,13 @@ spec: can be unavailable during the update. The value can be an absolute number (for example 5) or a percentage of desired nodes (for example 10%). The absolute number is calculated from a percentage by rounding - down. A value of 0 disables rolling updates. Defaults to 1. Example: - when this is set to 30%, the InstanceGroup can be scaled down - to 70% of desired nodes immediately when the rolling update starts. - Once new nodes are ready, more old nodes can be drained, ensuring - that the total number of nodes available at all times during the - update is at least 70% of desired nodes.' + down. A value of 0 for both this and MaxSurge disables rolling + updates. Defaults to 1 if MaxSurge is 0, otherwise defaults to + 0. Example: when this is set to 30%, the InstanceGroup can be + scaled down to 70% of desired nodes immediately when the rolling + update starts. Once new nodes are ready, more old nodes can be + drained, ensuring that the total number of nodes available at + all times during the update is at least 70% of desired nodes.' type: object secretStore: description: SecretStore is the VFS path to where secrets are stored diff --git a/k8s/crds/kops.k8s.io_instancegroups.yaml b/k8s/crds/kops.k8s.io_instancegroups.yaml index f3c35a309f..d3c9fdb399 100644 --- a/k8s/crds/kops.k8s.io_instancegroups.yaml +++ b/k8s/crds/kops.k8s.io_instancegroups.yaml @@ -627,6 +627,19 @@ spec: rollingUpdate: description: RollingUpdate defines the rolling-update behavior properties: + maxSurge: + anyOf: + - type: string + - type: integer + description: 'MaxSurge is the maximum number of extra nodes that + can be created during the update. The value can be an absolute + number (for example 5) or a percentage of desired machines (for + example 10%). The absolute number is calculated from a percentage + by rounding up. A value of 0 for both this and MaxUnavailable + disables rolling updates. Defaults to 0. Example: when this is + set to 30%, the InstanceGroup can be scaled up immediately when + the rolling update starts, such that the total number of old and + new nodes do not exceed 130% of desired nodes.' maxUnavailable: anyOf: - type: string @@ -635,12 +648,13 @@ spec: can be unavailable during the update. The value can be an absolute number (for example 5) or a percentage of desired nodes (for example 10%). The absolute number is calculated from a percentage by rounding - down. A value of 0 disables rolling updates. Defaults to 1. Example: - when this is set to 30%, the InstanceGroup can be scaled down - to 70% of desired nodes immediately when the rolling update starts. - Once new nodes are ready, more old nodes can be drained, ensuring - that the total number of nodes available at all times during the - update is at least 70% of desired nodes.' + down. A value of 0 for both this and MaxSurge disables rolling + updates. Defaults to 1 if MaxSurge is 0, otherwise defaults to + 0. Example: when this is set to 30%, the InstanceGroup can be + scaled down to 70% of desired nodes immediately when the rolling + update starts. Once new nodes are ready, more old nodes can be + drained, ensuring that the total number of nodes available at + all times during the update is at least 70% of desired nodes.' type: object rootVolumeDeleteOnTermination: description: 'RootVolumeDeleteOnTermination configures root volume retention From 4ddc58ca5ea0cfb8e7fa7f5f3bea8491721cdd5a Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 5 Jan 2020 20:26:47 -0800 Subject: [PATCH 07/13] Add MaxSurge to resolveSettings --- pkg/instancegroups/settings.go | 23 ++++++++++-- pkg/instancegroups/settings_test.go | 54 +++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 3 deletions(-) diff --git a/pkg/instancegroups/settings.go b/pkg/instancegroups/settings.go index 3eb5a6371c..f23d2e69b6 100644 --- a/pkg/instancegroups/settings.go +++ b/pkg/instancegroups/settings.go @@ -31,18 +31,35 @@ func resolveSettings(cluster *kops.Cluster, group *kops.InstanceGroup, numInstan if rollingUpdate.MaxUnavailable == nil { rollingUpdate.MaxUnavailable = def.MaxUnavailable } + if rollingUpdate.MaxSurge == nil { + rollingUpdate.MaxSurge = def.MaxSurge + } } + if rollingUpdate.MaxSurge == nil { + zero := intstr.FromInt(0) + rollingUpdate.MaxSurge = &zero + } + + if rollingUpdate.MaxSurge.Type == intstr.String { + surge, _ := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxSurge, numInstances, true) + surgeInt := intstr.FromInt(surge) + rollingUpdate.MaxSurge = &surgeInt + } + + maxUnavailableDefault := intstr.FromInt(0) + if rollingUpdate.MaxSurge.Type == intstr.Int && rollingUpdate.MaxSurge.IntVal == 0 { + maxUnavailableDefault = intstr.FromInt(1) + } if rollingUpdate.MaxUnavailable == nil || rollingUpdate.MaxUnavailable.IntVal < 0 { - one := intstr.FromInt(1) - rollingUpdate.MaxUnavailable = &one + rollingUpdate.MaxUnavailable = &maxUnavailableDefault } if rollingUpdate.MaxUnavailable.Type == intstr.String { unavailable, err := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxUnavailable, numInstances, false) if err != nil { // If unparseable use the default value - unavailable = 1 + unavailable = maxUnavailableDefault.IntValue() } if unavailable <= 0 { // While we round down, percentages should resolve to a minimum of 1 diff --git a/pkg/instancegroups/settings_test.go b/pkg/instancegroups/settings_test.go index dd49221f9c..1dc2c08539 100644 --- a/pkg/instancegroups/settings_test.go +++ b/pkg/instancegroups/settings_test.go @@ -37,6 +37,11 @@ func TestSettings(t *testing.T) { defaultValue: intstr.FromInt(1), nonDefaultValue: intstr.FromInt(2), }, + { + name: "MaxSurge", + defaultValue: intstr.FromInt(0), + nonDefaultValue: intstr.FromInt(2), + }, } { t.Run(tc.name, func(t *testing.T) { defaultCluster := &kops.RollingUpdate{} @@ -165,3 +170,52 @@ func TestMaxUnavailable(t *testing.T) { }) } } + +func TestMaxSurge(t *testing.T) { + for _, tc := range []struct { + numInstances int + value string + expected int32 + }{ + { + numInstances: 1, + value: "0", + expected: 0, + }, + { + numInstances: 1, + value: "0%", + expected: 0, + }, + { + numInstances: 10, + value: "31%", + expected: 4, + }, + { + numInstances: 10, + value: "100%", + expected: 10, + }, + } { + t.Run(fmt.Sprintf("%s %d", tc.value, tc.numInstances), func(t *testing.T) { + value := intstr.Parse(tc.value) + rollingUpdate := kops.RollingUpdate{ + MaxSurge: &value, + } + instanceGroup := kops.InstanceGroup{ + Spec: kops.InstanceGroupSpec{ + RollingUpdate: &rollingUpdate, + }, + } + resolved := resolveSettings(&kops.Cluster{}, &instanceGroup, tc.numInstances) + assert.Equal(t, intstr.Int, resolved.MaxSurge.Type) + assert.Equal(t, tc.expected, resolved.MaxSurge.IntVal) + if tc.expected == 0 { + assert.Equal(t, int32(1), resolved.MaxUnavailable.IntVal, "MaxUnavailable default") + } else { + assert.Equal(t, int32(0), resolved.MaxUnavailable.IntVal, "MaxUnavailable default") + } + }) + } +} From cee662d521ba527f94b50fad5aee90eb51b1c268 Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 5 Jan 2020 22:26:14 -0800 Subject: [PATCH 08/13] Implement MaxSurge happy path --- pkg/instancegroups/BUILD.bazel | 1 + pkg/instancegroups/instancegroups.go | 83 ++++++++- pkg/instancegroups/rollingupdate_test.go | 222 +++++++++++++++++++++-- 3 files changed, 285 insertions(+), 21 deletions(-) diff --git a/pkg/instancegroups/BUILD.bazel b/pkg/instancegroups/BUILD.bazel index 51d2bd6009..47ec4bfe3a 100644 --- a/pkg/instancegroups/BUILD.bazel +++ b/pkg/instancegroups/BUILD.bazel @@ -44,6 +44,7 @@ go_test( "//upup/pkg/fi/cloudup/awsup:go_default_library", "//vendor/github.com/aws/aws-sdk-go/aws:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/autoscaling:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/ec2/ec2iface:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", diff --git a/pkg/instancegroups/instancegroups.go b/pkg/instancegroups/instancegroups.go index 26b695f8fc..562cb93729 100644 --- a/pkg/instancegroups/instancegroups.go +++ b/pkg/instancegroups/instancegroups.go @@ -101,10 +101,7 @@ func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting b return stopPrompting, err } -// TODO: Temporarily increase size of ASG? -// TODO: Remove from ASG first so status is immediately updated? - -// RollingUpdate performs a rolling update on a list of ec2 instances. +// RollingUpdate performs a rolling update on a list of instances. func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpdateCluster, cluster *api.Cluster, isBastion bool, sleepAfterTerminate time.Duration, validationTimeout time.Duration) (err error) { // we should not get here, but hey I am going to check. @@ -152,17 +149,59 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd settings := resolveSettings(cluster, r.CloudGroup.InstanceGroup, numInstances) runningDrains := 0 - maxConcurrency := settings.MaxUnavailable.IntValue() + maxSurge := settings.MaxSurge.IntValue() + if maxSurge > len(update) { + maxSurge = len(update) + } + maxConcurrency := maxSurge + settings.MaxUnavailable.IntValue() if maxConcurrency == 0 { klog.Infof("Rolling updates for InstanceGroup %s are disabled", r.CloudGroup.InstanceGroup.Name) return nil } + if r.CloudGroup.InstanceGroup.Spec.Role == api.InstanceGroupRoleMaster && maxSurge != 0 { + // Masters are incapable of surging because they rely on registering themselves through + // the local apiserver. That apiserver depends on the local etcd, which relies on being + // joined to the etcd cluster. + maxSurge = 0 + maxConcurrency = settings.MaxUnavailable.IntValue() + if maxConcurrency == 0 { + maxConcurrency = 1 + } + } + if rollingUpdateData.Interactive { + if maxSurge > 1 { + maxSurge = 1 + } maxConcurrency = 1 } + // TODO sort 'update' to put already detached instances last. + + if maxSurge > 0 && !rollingUpdateData.CloudOnly { + // TODO don't detach instances that are already detached. Handle effects of that on noneReady behavior. + for numSurge := 1; numSurge <= maxSurge; numSurge++ { + if err := r.detachInstance(update[len(update)-numSurge]); err != nil { + return err + } + + // If noneReady, wait until after one node is detached and its replacement validates + // before the detaching more in case the current spec does not result in usable nodes. + if numSurge == maxSurge || (noneReady && numSurge == 1) { + // Wait for the minimum interval + klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate) + time.Sleep(sleepAfterTerminate) + + if err := r.maybeValidate(rollingUpdateData, validationTimeout, "detaching"); err != nil { + return err + } + } + } + noneReady = false + } + terminateChan := make(chan error, maxConcurrency) for uIdx, u := range update { @@ -183,7 +222,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } - err = r.maybeValidate(rollingUpdateData, validationTimeout) + err = r.maybeValidate(rollingUpdateData, validationTimeout, "removing") if err != nil { return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } @@ -229,7 +268,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd } } - err = r.maybeValidate(rollingUpdateData, validationTimeout) + err = r.maybeValidate(rollingUpdateData, validationTimeout, "removing") if err != nil { return err } @@ -359,7 +398,7 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo return nil } -func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpdateCluster, validationTimeout time.Duration) error { +func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpdateCluster, validationTimeout time.Duration, operation string) error { if rollingUpdateData.CloudOnly { klog.Warningf("Not validating cluster as cloudonly flag is set.") @@ -370,10 +409,10 @@ func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpd if rollingUpdateData.FailOnValidate { klog.Errorf("Cluster did not validate within %s", validationTimeout) - return fmt.Errorf("error validating cluster after removing a node: %v", err) + return fmt.Errorf("error validating cluster after %s a node: %v", operation, err) } - klog.Warningf("Cluster validation failed after removing instance, proceeding since fail-on-validate is set to false: %v", err) + klog.Warningf("Cluster validation failed after %s instance, proceeding since fail-on-validate is set to false: %v", operation, err) } } return nil @@ -450,6 +489,30 @@ func (r *RollingUpdateInstanceGroup) validateCluster(rollingUpdateData *RollingU } +// detachInstance detaches a Cloud Instance +func (r *RollingUpdateInstanceGroup) detachInstance(u *cloudinstances.CloudInstanceGroupMember) error { + id := u.ID + nodeName := "" + if u.Node != nil { + nodeName = u.Node.Name + } + if nodeName != "" { + klog.Infof("Detaching instance %q, node %q, in group %q.", id, nodeName, r.CloudGroup.HumanName) + } else { + klog.Infof("Detaching instance %q, in group %q.", id, r.CloudGroup.HumanName) + } + + if err := r.Cloud.DetachInstance(u); err != nil { + if nodeName != "" { + return fmt.Errorf("error detaching instance %q, node %q: %v", id, nodeName, err) + } else { + return fmt.Errorf("error detaching instance %q: %v", id, err) + } + } + + return nil +} + // DeleteInstance deletes an Cloud Instance. func (r *RollingUpdateInstanceGroup) DeleteInstance(u *cloudinstances.CloudInstanceGroupMember) error { id := u.ID diff --git a/pkg/instancegroups/rollingupdate_test.go b/pkg/instancegroups/rollingupdate_test.go index 14c51cb6ee..fd645a37d5 100644 --- a/pkg/instancegroups/rollingupdate_test.go +++ b/pkg/instancegroups/rollingupdate_test.go @@ -25,6 +25,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/stretchr/testify/assert" @@ -112,6 +113,7 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k fakeClient := k8sClient.(*fake.Clientset) groups[name] = &cloudinstances.CloudInstanceGroup{ + HumanName: name, InstanceGroup: &kopsapi.InstanceGroup{ ObjectMeta: v1meta.ObjectMeta{ Name: name, @@ -120,6 +122,7 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k Role: role, }, }, + Raw: &autoscaling.Group{AutoScalingGroupName: aws.String("asg-" + name)}, } cloud.Autoscaling().CreateAutoScalingGroup(&autoscaling.CreateAutoScalingGroupInput{ AutoScalingGroupName: aws.String(name), @@ -139,8 +142,9 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k _ = fakeClient.Tracker().Add(node) } member := cloudinstances.CloudInstanceGroupMember{ - ID: id, - Node: node, + ID: id, + Node: node, + CloudInstanceGroup: groups[name], } if i < needUpdate { groups[name].NeedUpdate = append(groups[name].NeedUpdate, &member) @@ -608,6 +612,52 @@ func TestRollingUpdateTaintAllButOneNeedUpdate(t *testing.T) { assertGroupInstanceCount(t, cloud, "node-1", 1) } +func TestRollingUpdateMaxSurgeIgnoredForMaster(t *testing.T) { + c, cloud, cluster := getTestSetup() + + two := intstr.FromInt(2) + cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ + MaxSurge: &two, + } + + groups := make(map[string]*cloudinstances.CloudInstanceGroup) + makeGroup(groups, c.K8sClient, cloud, "master-1", kopsapi.InstanceGroupRoleMaster, 3, 2) + err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{}) + assert.NoError(t, err, "rolling update") + + cordoned := "" + tainted := map[string]bool{} + deleted := map[string]bool{} + for _, action := range c.K8sClient.(*fake.Clientset).Actions() { + switch a := action.(type) { + case testingclient.PatchAction: + if string(a.GetPatch()) == cordonPatch { + assertCordon(t, a) + assert.Equal(t, "", cordoned, "at most one node cordoned at a time") + assert.True(t, tainted[a.GetName()], "node", a.GetName(), "tainted") + cordoned = a.GetName() + } else { + assertTaint(t, a) + assert.Equal(t, "", cordoned, "not tainting while node cordoned") + assert.False(t, tainted[a.GetName()], "node", a.GetName(), "already tainted") + tainted[a.GetName()] = true + } + case testingclient.DeleteAction: + assert.Equal(t, "nodes", a.GetResource().Resource) + assert.Equal(t, cordoned, a.GetName(), "node was cordoned before delete") + assert.False(t, deleted[a.GetName()], "node", a.GetName(), "already deleted") + deleted[a.GetName()] = true + cordoned = "" + case testingclient.ListAction: + // Don't care + default: + t.Errorf("unexpected action %v", a) + } + } + + assertGroupInstanceCount(t, cloud, "master-1", 1) +} + func TestRollingUpdateDisabled(t *testing.T) { c, cloud, cluster := getTestSetup() @@ -647,12 +697,26 @@ func TestRollingUpdateDisabledCloudonly(t *testing.T) { // The concurrent update tests attempt to induce the following expected update sequence: // -// (Only for "all need update" tests, to verify the toe-dipping behavior) +// (Only for surging "all need update" test, to verify the toe-dipping behavior) +// Request validate (8) --> +// <-- validated +// Detach instance --> +// Request validate (7) --> +// <-- validated +// Detach instance --> +// (end only for surging "all need update" tests) +// (Only for surging "all but one need update" test) +// Request validate (7) --> +// <-- validated +// Detach instance --> +// Detach instance --> +// (end only for surging "all but one need update" test) +// (Only for non-surging "all need update" tests, to verify the toe-dipping behavior) // Request validate (7) --> // <-- validated // Request terminate 1 node (7) --> // <-- 1 node terminated, 6 left -// (end only for "all need update" tests) +// (end only for non-surging "all need update" tests) // Request validate (6) --> // <-- validated // Request terminate 2 nodes (6,5) --> @@ -678,16 +742,24 @@ type concurrentTest struct { ec2iface.EC2API t *testing.T mutex sync.Mutex + surge int terminationRequestsLeft int previousValidation int validationChan chan bool terminationChan chan bool + detached map[string]bool } func (c *concurrentTest) Validate() (*validation.ValidationCluster, error) { c.mutex.Lock() defer c.mutex.Unlock() + if len(c.detached) < c.surge { + assert.Greater(c.t, c.previousValidation, 7, "previous validation") + c.previousValidation-- + return &validation.ValidationCluster{}, nil + } + terminationRequestsLeft := c.terminationRequestsLeft switch terminationRequestsLeft { case 7, 6, 0: @@ -738,7 +810,12 @@ func (c *concurrentTest) TerminateInstances(input *ec2.TerminateInstancesInput) c.mutex.Lock() defer c.mutex.Unlock() - for range input.InstanceIds { + for _, id := range input.InstanceIds { + assert.Equal(c.t, c.surge, len(c.detached), "Number of detached instances") + if c.detached[*id] { + assert.LessOrEqual(c.t, c.terminationRequestsLeft, c.surge, "Deleting detached instances last") + } + terminationRequestsLeft := c.terminationRequestsLeft c.terminationRequestsLeft-- switch terminationRequestsLeft { @@ -776,25 +853,33 @@ func (c *concurrentTest) AssertComplete() { assert.Equal(c.t, 0, c.previousValidation, "last validation") } -func newConcurrentTest(t *testing.T, cloud *awsup.MockAWSCloud, allNeedUpdate bool) *concurrentTest { +func newConcurrentTest(t *testing.T, cloud *awsup.MockAWSCloud, numSurge int, allNeedUpdate bool) *concurrentTest { test := concurrentTest{ EC2API: cloud.MockEC2, t: t, + surge: numSurge, terminationRequestsLeft: 6, validationChan: make(chan bool), terminationChan: make(chan bool), + detached: map[string]bool{}, } - if allNeedUpdate { + if numSurge == 0 && allNeedUpdate { test.terminationRequestsLeft = 7 } - test.previousValidation = test.terminationRequestsLeft + 1 + if numSurge == 0 { + test.previousValidation = test.terminationRequestsLeft + 1 + } else if allNeedUpdate { + test.previousValidation = 9 + } else { + test.previousValidation = 8 + } return &test } func TestRollingUpdateMaxUnavailableAllNeedUpdate(t *testing.T) { c, cloud, cluster := getTestSetup() - concurrentTest := newConcurrentTest(t, cloud, true) + concurrentTest := newConcurrentTest(t, cloud, 0, true) c.ValidateSuccessDuration = 0 c.ClusterValidator = concurrentTest cloud.MockEC2 = concurrentTest @@ -817,7 +902,7 @@ func TestRollingUpdateMaxUnavailableAllNeedUpdate(t *testing.T) { func TestRollingUpdateMaxUnavailableAllButOneNeedUpdate(t *testing.T) { c, cloud, cluster := getTestSetup() - concurrentTest := newConcurrentTest(t, cloud, false) + concurrentTest := newConcurrentTest(t, cloud, 0, false) c.ValidateSuccessDuration = 0 c.ClusterValidator = concurrentTest cloud.MockEC2 = concurrentTest @@ -839,7 +924,7 @@ func TestRollingUpdateMaxUnavailableAllButOneNeedUpdate(t *testing.T) { func TestRollingUpdateMaxUnavailableAllNeedUpdateMaster(t *testing.T) { c, cloud, cluster := getTestSetup() - concurrentTest := newConcurrentTest(t, cloud, true) + concurrentTest := newConcurrentTest(t, cloud, 0, true) c.ValidateSuccessDuration = 0 c.ClusterValidator = concurrentTest cloud.MockEC2 = concurrentTest @@ -859,6 +944,121 @@ func TestRollingUpdateMaxUnavailableAllNeedUpdateMaster(t *testing.T) { concurrentTest.AssertComplete() } +type concurrentTestAutoscaling struct { + autoscalingiface.AutoScalingAPI + ConcurrentTest *concurrentTest +} + +func (m *concurrentTestAutoscaling) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) { + m.ConcurrentTest.mutex.Lock() + defer m.ConcurrentTest.mutex.Unlock() + + assert.Equal(m.ConcurrentTest.t, "node-1", *input.AutoScalingGroupName) + assert.False(m.ConcurrentTest.t, *input.ShouldDecrementDesiredCapacity) + + for _, id := range input.InstanceIds { + assert.Less(m.ConcurrentTest.t, len(m.ConcurrentTest.detached), m.ConcurrentTest.surge, "Number of detached instances") + assert.False(m.ConcurrentTest.t, m.ConcurrentTest.detached[*id], *id+" already detached") + m.ConcurrentTest.detached[*id] = true + } + return &autoscaling.DetachInstancesOutput{}, nil +} + +type ec2IgnoreTags struct { + ec2iface.EC2API +} + +// CreateTags ignores tagging of instances done by the AWS fi.Cloud implementation of DetachInstance() +func (e *ec2IgnoreTags) CreateTags(*ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) { + return &ec2.CreateTagsOutput{}, nil +} + +func TestRollingUpdateMaxSurgeAllNeedUpdate(t *testing.T) { + c, cloud, cluster := getTestSetup() + + concurrentTest := newConcurrentTest(t, cloud, 2, true) + c.ValidateSuccessDuration = 0 + c.ClusterValidator = concurrentTest + cloud.MockAutoscaling = &concurrentTestAutoscaling{ + AutoScalingAPI: cloud.MockAutoscaling, + ConcurrentTest: concurrentTest, + } + cloud.MockEC2 = &ec2IgnoreTags{EC2API: concurrentTest} + + two := intstr.FromInt(2) + cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ + MaxSurge: &two, + } + + groups := make(map[string]*cloudinstances.CloudInstanceGroup) + makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 6, 6) + + err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{}) + assert.NoError(t, err, "rolling update") + + assertGroupInstanceCount(t, cloud, "node-1", 0) + concurrentTest.AssertComplete() +} + +func TestRollingUpdateMaxSurgeAllButOneNeedUpdate(t *testing.T) { + c, cloud, cluster := getTestSetup() + + concurrentTest := newConcurrentTest(t, cloud, 2, false) + c.ValidateSuccessDuration = 0 + c.ClusterValidator = concurrentTest + cloud.MockAutoscaling = &concurrentTestAutoscaling{ + AutoScalingAPI: cloud.MockAutoscaling, + ConcurrentTest: concurrentTest, + } + cloud.MockEC2 = &ec2IgnoreTags{EC2API: concurrentTest} + + two := intstr.FromInt(2) + cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ + MaxSurge: &two, + } + + groups := make(map[string]*cloudinstances.CloudInstanceGroup) + makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 7, 6) + err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{}) + assert.NoError(t, err, "rolling update") + + assertGroupInstanceCount(t, cloud, "node-1", 1) + concurrentTest.AssertComplete() +} + +type countDetach struct { + autoscalingiface.AutoScalingAPI + Count int +} + +func (c *countDetach) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) { + c.Count += len(input.InstanceIds) + return &autoscaling.DetachInstancesOutput{}, nil +} + +func TestRollingUpdateMaxSurgeGreaterThanNeedUpdate(t *testing.T) { + c, cloud, cluster := getTestSetup() + + countDetach := &countDetach{AutoScalingAPI: cloud.MockAutoscaling} + cloud.MockAutoscaling = countDetach + cloud.MockEC2 = &ec2IgnoreTags{EC2API: cloud.MockEC2} + + ten := intstr.FromInt(10) + cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ + MaxSurge: &ten, + } + + groups := make(map[string]*cloudinstances.CloudInstanceGroup) + makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 3, 2) + err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{}) + assert.NoError(t, err, "rolling update") + + assertGroupInstanceCount(t, cloud, "node-1", 1) + assert.Equal(t, 2, countDetach.Count) +} + +// TODO tests for surging when instances start off already detached + func assertCordon(t *testing.T, action testingclient.PatchAction) { assert.Equal(t, "nodes", action.GetResource().Resource) assert.Equal(t, cordonPatch, string(action.GetPatch())) From ebfcf5d909b92600f091384dc0bfc98f810fc564 Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Fri, 10 Jan 2020 22:10:16 -0800 Subject: [PATCH 09/13] Implement recovery from previous failed surge rolling updates --- pkg/instancegroups/instancegroups.go | 51 +++++--- pkg/instancegroups/rollingupdate_test.go | 145 ++++++++++++++++++++++- 2 files changed, 180 insertions(+), 16 deletions(-) diff --git a/pkg/instancegroups/instancegroups.go b/pkg/instancegroups/instancegroups.go index 562cb93729..e04af77ad3 100644 --- a/pkg/instancegroups/instancegroups.go +++ b/pkg/instancegroups/instancegroups.go @@ -178,28 +178,30 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd maxConcurrency = 1 } - // TODO sort 'update' to put already detached instances last. + update = prioritizeUpdate(update) if maxSurge > 0 && !rollingUpdateData.CloudOnly { - // TODO don't detach instances that are already detached. Handle effects of that on noneReady behavior. for numSurge := 1; numSurge <= maxSurge; numSurge++ { - if err := r.detachInstance(update[len(update)-numSurge]); err != nil { - return err - } - - // If noneReady, wait until after one node is detached and its replacement validates - // before the detaching more in case the current spec does not result in usable nodes. - if numSurge == maxSurge || (noneReady && numSurge == 1) { - // Wait for the minimum interval - klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate) - time.Sleep(sleepAfterTerminate) - - if err := r.maybeValidate(rollingUpdateData, validationTimeout, "detaching"); err != nil { + u := update[len(update)-numSurge] + if !u.Detached { + if err := r.detachInstance(u); err != nil { return err } + + // If noneReady, wait until after one node is detached and its replacement validates + // before the detaching more in case the current spec does not result in usable nodes. + if numSurge == maxSurge || noneReady { + // Wait for the minimum interval + klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate) + time.Sleep(sleepAfterTerminate) + + if err := r.maybeValidate(rollingUpdateData, validationTimeout, "detaching"); err != nil { + return err + } + noneReady = false + } } } - noneReady = false } terminateChan := make(chan error, maxConcurrency) @@ -277,6 +279,25 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd return nil } +func prioritizeUpdate(update []*cloudinstances.CloudInstanceGroupMember) []*cloudinstances.CloudInstanceGroupMember { + // The priorities are, in order: + // attached before detached + // TODO unhealthy before healthy + // NeedUpdate before Ready (preserve original order) + result := make([]*cloudinstances.CloudInstanceGroupMember, 0, len(update)) + var detached []*cloudinstances.CloudInstanceGroupMember + for _, u := range update { + if u.Detached { + detached = append(detached, u) + } else { + result = append(result, u) + } + } + + result = append(result, detached...) + return result +} + func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan error, err error) error { for runningDrains > 0 { <-terminateChan diff --git a/pkg/instancegroups/rollingupdate_test.go b/pkg/instancegroups/rollingupdate_test.go index fd645a37d5..d483877d80 100644 --- a/pkg/instancegroups/rollingupdate_test.go +++ b/pkg/instancegroups/rollingupdate_test.go @@ -1057,7 +1057,150 @@ func TestRollingUpdateMaxSurgeGreaterThanNeedUpdate(t *testing.T) { assert.Equal(t, 2, countDetach.Count) } -// TODO tests for surging when instances start off already detached +// Request validate (1) --> +// <-- validated +// Detach instance --> +// Request validate (2) --> +// <-- validated +// Detach instance --> +// Request validate (3) --> +// <-- validated +// Request terminate 3 nodes --> +// <-- 3 nodes terminated, 1 left +// Request validate (4) --> +// <-- validated +// Request terminate 1 node --> +// <-- 1 node terminated, 0 left +// Request validate (5) --> +// <-- validated +type alreadyDetachedTest struct { + ec2iface.EC2API + t *testing.T + mutex sync.Mutex + terminationRequestsLeft int + numValidations int + detached map[string]bool +} + +func (t *alreadyDetachedTest) Validate() (*validation.ValidationCluster, error) { + t.mutex.Lock() + defer t.mutex.Unlock() + + t.numValidations++ + switch t.numValidations { + case 1, 2, 3: + assert.Equal(t.t, t.numValidations, len(t.detached), "numnber of detached instances") + case 4: + assert.Equal(t.t, 1, t.terminationRequestsLeft, "terminations left") + t.mutex.Unlock() + time.Sleep(2 * time.Millisecond) // NodeInterval plus some + t.mutex.Lock() + case 5: + assert.Equal(t.t, 0, t.terminationRequestsLeft, "terminations left") + case 6: + t.t.Error("unexpected sixth call to Validate") + } + + return &validation.ValidationCluster{}, nil +} + +func (t *alreadyDetachedTest) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) { + if input.DryRun != nil && *input.DryRun { + return &ec2.TerminateInstancesOutput{}, nil + } + + t.mutex.Lock() + defer t.mutex.Unlock() + + for _, id := range input.InstanceIds { + assert.Equal(t.t, 3, len(t.detached), "Number of detached instances") + assert.GreaterOrEqual(t.t, t.numValidations, 3, "Number of previous validations") + if t.terminationRequestsLeft == 1 { + assert.True(t.t, t.detached[*id], "Last deleted instance %q was detached", *id) + } + + t.terminationRequestsLeft-- + } + return t.EC2API.TerminateInstances(input) +} + +type alreadyDetachedTestAutoscaling struct { + autoscalingiface.AutoScalingAPI + AlreadyDetachedTest *alreadyDetachedTest +} + +func (m *alreadyDetachedTestAutoscaling) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) { + m.AlreadyDetachedTest.mutex.Lock() + defer m.AlreadyDetachedTest.mutex.Unlock() + + for _, id := range input.InstanceIds { + assert.Less(m.AlreadyDetachedTest.t, len(m.AlreadyDetachedTest.detached), 3, "Number of detached instances") + assert.False(m.AlreadyDetachedTest.t, m.AlreadyDetachedTest.detached[*id], *id+" already detached") + m.AlreadyDetachedTest.detached[*id] = true + } + return &autoscaling.DetachInstancesOutput{}, nil +} + +func TestRollingUpdateMaxSurgeAllNeedUpdateOneAlreadyDetached(t *testing.T) { + c, cloud, cluster := getTestSetup() + + alreadyDetachedTest := &alreadyDetachedTest{ + EC2API: cloud.MockEC2, + t: t, + terminationRequestsLeft: 4, + detached: map[string]bool{}, + } + + c.ValidateSuccessDuration = 0 + c.ClusterValidator = alreadyDetachedTest + cloud.MockAutoscaling = &alreadyDetachedTestAutoscaling{ + AutoScalingAPI: cloud.MockAutoscaling, + AlreadyDetachedTest: alreadyDetachedTest, + } + cloud.MockEC2 = &ec2IgnoreTags{EC2API: alreadyDetachedTest} + + three := intstr.FromInt(3) + cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ + MaxSurge: &three, + } + + groups := make(map[string]*cloudinstances.CloudInstanceGroup) + makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 4, 4) + alreadyDetachedTest.detached[groups["node-1"].NeedUpdate[3].ID] = true + groups["node-1"].NeedUpdate[3].Detached = true + err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{}) + assert.NoError(t, err, "rolling update") + + assertGroupInstanceCount(t, cloud, "node-1", 0) + assert.Equal(t, 5, alreadyDetachedTest.numValidations, "Number of validations") +} + +func TestRollingUpdateMaxSurgeAllNeedUpdateMaxAlreadyDetached(t *testing.T) { + c, cloud, cluster := getTestSetup() + + // Should behave the same as TestRollingUpdateMaxUnavailableAllNeedUpdate + concurrentTest := newConcurrentTest(t, cloud, 0, true) + c.ValidateSuccessDuration = 0 + c.ClusterValidator = concurrentTest + cloud.MockEC2 = concurrentTest + + two := intstr.FromInt(2) + cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ + MaxSurge: &two, + } + + groups := make(map[string]*cloudinstances.CloudInstanceGroup) + makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 7, 7) + groups["node-1"].NeedUpdate[1].Detached = true + groups["node-1"].NeedUpdate[3].Detached = true + // TODO verify those are the last two instances terminated + + err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{}) + assert.NoError(t, err, "rolling update") + + assertGroupInstanceCount(t, cloud, "node-1", 0) + concurrentTest.AssertComplete() +} func assertCordon(t *testing.T, action testingclient.PatchAction) { assert.Equal(t, "nodes", action.GetResource().Resource) From 38b7219b1478a55308af1d393766b65ff90dfa63 Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Mon, 27 Jan 2020 20:35:29 -0800 Subject: [PATCH 10/13] Remove code made unnecessary by apimachinery validation --- pkg/instancegroups/settings.go | 8 ++------ pkg/instancegroups/settings_test.go | 15 --------------- 2 files changed, 2 insertions(+), 21 deletions(-) diff --git a/pkg/instancegroups/settings.go b/pkg/instancegroups/settings.go index f23d2e69b6..d4daaae3d3 100644 --- a/pkg/instancegroups/settings.go +++ b/pkg/instancegroups/settings.go @@ -51,16 +51,12 @@ func resolveSettings(cluster *kops.Cluster, group *kops.InstanceGroup, numInstan if rollingUpdate.MaxSurge.Type == intstr.Int && rollingUpdate.MaxSurge.IntVal == 0 { maxUnavailableDefault = intstr.FromInt(1) } - if rollingUpdate.MaxUnavailable == nil || rollingUpdate.MaxUnavailable.IntVal < 0 { + if rollingUpdate.MaxUnavailable == nil { rollingUpdate.MaxUnavailable = &maxUnavailableDefault } if rollingUpdate.MaxUnavailable.Type == intstr.String { - unavailable, err := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxUnavailable, numInstances, false) - if err != nil { - // If unparseable use the default value - unavailable = maxUnavailableDefault.IntValue() - } + unavailable, _ := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxUnavailable, numInstances, false) if unavailable <= 0 { // While we round down, percentages should resolve to a minimum of 1 unavailable = 1 diff --git a/pkg/instancegroups/settings_test.go b/pkg/instancegroups/settings_test.go index 1dc2c08539..fec020152b 100644 --- a/pkg/instancegroups/settings_test.go +++ b/pkg/instancegroups/settings_test.go @@ -138,21 +138,6 @@ func TestMaxUnavailable(t *testing.T) { value: "100%", expected: 10, }, - { - numInstances: 5, - value: "fnord", - expected: 1, - }, - { - numInstances: 5, - value: "-3", - expected: 1, - }, - { - numInstances: 5, - value: "-3%", - expected: 1, - }, } { t.Run(fmt.Sprintf("%s %d", tc.value, tc.numInstances), func(t *testing.T) { value := intstr.Parse(tc.value) From 53c362da251b5d7defa0117c34a89626e149e21e Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Mon, 17 Feb 2020 08:40:29 -0800 Subject: [PATCH 11/13] Fix field name for api validation --- pkg/apis/kops/validation/validation.go | 4 ++-- pkg/apis/kops/validation/validation_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/apis/kops/validation/validation.go b/pkg/apis/kops/validation/validation.go index c9b8b68318..059c1b6fa6 100644 --- a/pkg/apis/kops/validation/validation.go +++ b/pkg/apis/kops/validation/validation.go @@ -461,11 +461,11 @@ func validateRollingUpdate(rollingUpdate *kops.RollingUpdate, fldpath *field.Pat if rollingUpdate.MaxSurge != nil { surge, err := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxSurge, 1, false) if err != nil { - allErrs = append(allErrs, field.Invalid(fldpath.Child("MaxSurge"), rollingUpdate.MaxSurge, + allErrs = append(allErrs, field.Invalid(fldpath.Child("maxSurge"), rollingUpdate.MaxSurge, fmt.Sprintf("Unable to parse: %v", err))) } if surge < 0 { - allErrs = append(allErrs, field.Invalid(fldpath.Child("MaxSurge"), rollingUpdate.MaxSurge, "Cannot be negative")) + allErrs = append(allErrs, field.Invalid(fldpath.Child("maxSurge"), rollingUpdate.MaxSurge, "Cannot be negative")) } } diff --git a/pkg/apis/kops/validation/validation_test.go b/pkg/apis/kops/validation/validation_test.go index bd99e54d17..ccc7c50006 100644 --- a/pkg/apis/kops/validation/validation_test.go +++ b/pkg/apis/kops/validation/validation_test.go @@ -452,19 +452,19 @@ func Test_Validate_RollingUpdate(t *testing.T) { Input: kops.RollingUpdate{ MaxSurge: intStr(intstr.FromString("nope")), }, - ExpectedErrors: []string{"Invalid value::TestField.MaxSurge"}, + ExpectedErrors: []string{"Invalid value::testField.maxSurge"}, }, { Input: kops.RollingUpdate{ MaxSurge: intStr(intstr.FromInt(-1)), }, - ExpectedErrors: []string{"Invalid value::TestField.MaxSurge"}, + ExpectedErrors: []string{"Invalid value::testField.maxSurge"}, }, { Input: kops.RollingUpdate{ MaxSurge: intStr(intstr.FromString("-1%")), }, - ExpectedErrors: []string{"Invalid value::TestField.MaxSurge"}, + ExpectedErrors: []string{"Invalid value::testField.maxSurge"}, }, } for _, g := range grid { From ed737261958238a371ff0d64a1348bde02ddb598 Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Fri, 28 Feb 2020 21:05:43 -0800 Subject: [PATCH 12/13] Address review comments --- k8s/crds/kops.k8s.io_clusters.yaml | 9 ++-- k8s/crds/kops.k8s.io_instancegroups.yaml | 9 ++-- pkg/apis/kops/cluster.go | 1 + pkg/apis/kops/v1alpha1/cluster.go | 1 + pkg/apis/kops/v1alpha2/cluster.go | 1 + pkg/apis/kops/validation/instancegroup.go | 2 +- pkg/apis/kops/validation/validation.go | 10 ++-- pkg/apis/kops/validation/validation_test.go | 60 ++++++++++++++++++++- pkg/instancegroups/instancegroups.go | 2 +- 9 files changed, 80 insertions(+), 15 deletions(-) diff --git a/k8s/crds/kops.k8s.io_clusters.yaml b/k8s/crds/kops.k8s.io_clusters.yaml index ebb26217d5..06ac196914 100644 --- a/k8s/crds/kops.k8s.io_clusters.yaml +++ b/k8s/crds/kops.k8s.io_clusters.yaml @@ -2911,10 +2911,11 @@ spec: number (for example 5) or a percentage of desired machines (for example 10%). The absolute number is calculated from a percentage by rounding up. A value of 0 for both this and MaxUnavailable - disables rolling updates. Defaults to 0. Example: when this is - set to 30%, the InstanceGroup can be scaled up immediately when - the rolling update starts, such that the total number of old and - new nodes do not exceed 130% of desired nodes.' + disables rolling updates. Has no effect on instance groups with + role "Master". Defaults to 0. Example: when this is set to 30%, + the InstanceGroup can be scaled up immediately when the rolling + update starts, such that the total number of old and new nodes + do not exceed 130% of desired nodes.' maxUnavailable: anyOf: - type: string diff --git a/k8s/crds/kops.k8s.io_instancegroups.yaml b/k8s/crds/kops.k8s.io_instancegroups.yaml index d3c9fdb399..1b9526f2ae 100644 --- a/k8s/crds/kops.k8s.io_instancegroups.yaml +++ b/k8s/crds/kops.k8s.io_instancegroups.yaml @@ -636,10 +636,11 @@ spec: number (for example 5) or a percentage of desired machines (for example 10%). The absolute number is calculated from a percentage by rounding up. A value of 0 for both this and MaxUnavailable - disables rolling updates. Defaults to 0. Example: when this is - set to 30%, the InstanceGroup can be scaled up immediately when - the rolling update starts, such that the total number of old and - new nodes do not exceed 130% of desired nodes.' + disables rolling updates. Has no effect on instance groups with + role "Master". Defaults to 0. Example: when this is set to 30%, + the InstanceGroup can be scaled up immediately when the rolling + update starts, such that the total number of old and new nodes + do not exceed 130% of desired nodes.' maxUnavailable: anyOf: - type: string diff --git a/pkg/apis/kops/cluster.go b/pkg/apis/kops/cluster.go index a16372b12a..c10cf73cc1 100644 --- a/pkg/apis/kops/cluster.go +++ b/pkg/apis/kops/cluster.go @@ -702,6 +702,7 @@ type RollingUpdate struct { // desired machines (for example 10%). // The absolute number is calculated from a percentage by rounding up. // A value of 0 for both this and MaxUnavailable disables rolling updates. + // Has no effect on instance groups with role "Master". // Defaults to 0. // Example: when this is set to 30%, the InstanceGroup can be scaled // up immediately when the rolling update starts, such that the total diff --git a/pkg/apis/kops/v1alpha1/cluster.go b/pkg/apis/kops/v1alpha1/cluster.go index 237c573e7b..54c4b2bc70 100644 --- a/pkg/apis/kops/v1alpha1/cluster.go +++ b/pkg/apis/kops/v1alpha1/cluster.go @@ -581,6 +581,7 @@ type RollingUpdate struct { // desired machines (for example 10%). // The absolute number is calculated from a percentage by rounding up. // A value of 0 for both this and MaxUnavailable disables rolling updates. + // Has no effect on instance groups with role "Master". // Defaults to 0. // Example: when this is set to 30%, the InstanceGroup can be scaled // up immediately when the rolling update starts, such that the total diff --git a/pkg/apis/kops/v1alpha2/cluster.go b/pkg/apis/kops/v1alpha2/cluster.go index 6c0a72abfd..09d8b5918b 100644 --- a/pkg/apis/kops/v1alpha2/cluster.go +++ b/pkg/apis/kops/v1alpha2/cluster.go @@ -594,6 +594,7 @@ type RollingUpdate struct { // desired machines (for example 10%). // The absolute number is calculated from a percentage by rounding up. // A value of 0 for both this and MaxUnavailable disables rolling updates. + // Has no effect on instance groups with role "Master". // Defaults to 0. // Example: when this is set to 30%, the InstanceGroup can be scaled // up immediately when the rolling update starts, such that the total diff --git a/pkg/apis/kops/validation/instancegroup.go b/pkg/apis/kops/validation/instancegroup.go index a39ff40ebf..a7f8001505 100644 --- a/pkg/apis/kops/validation/instancegroup.go +++ b/pkg/apis/kops/validation/instancegroup.go @@ -118,7 +118,7 @@ func ValidateInstanceGroup(g *kops.InstanceGroup) field.ErrorList { allErrs = append(allErrs, validateInstanceProfile(g.Spec.IAM, field.NewPath("spec", "iam"))...) if g.Spec.RollingUpdate != nil { - allErrs = append(allErrs, validateRollingUpdate(g.Spec.RollingUpdate, field.NewPath("spec", "rollingUpdate"))...) + allErrs = append(allErrs, validateRollingUpdate(g.Spec.RollingUpdate, field.NewPath("spec", "rollingUpdate"), g.Spec.Role == kops.InstanceGroupRoleMaster)...) } return allErrs diff --git a/pkg/apis/kops/validation/validation.go b/pkg/apis/kops/validation/validation.go index 64d9397380..6d2027529d 100644 --- a/pkg/apis/kops/validation/validation.go +++ b/pkg/apis/kops/validation/validation.go @@ -123,7 +123,7 @@ func validateClusterSpec(spec *kops.ClusterSpec, fieldPath *field.Path) field.Er } if spec.RollingUpdate != nil { - allErrs = append(allErrs, validateRollingUpdate(spec.RollingUpdate, fieldPath.Child("rollingUpdate"))...) + allErrs = append(allErrs, validateRollingUpdate(spec.RollingUpdate, fieldPath.Child("rollingUpdate"), false)...) } return allErrs @@ -438,7 +438,7 @@ func validateContainerRuntime(runtime *string, fldPath *field.Path) field.ErrorL return allErrs } -func validateRollingUpdate(rollingUpdate *kops.RollingUpdate, fldpath *field.Path) field.ErrorList { +func validateRollingUpdate(rollingUpdate *kops.RollingUpdate, fldpath *field.Path, onMasterInstanceGroup bool) field.ErrorList { allErrs := field.ErrorList{} if rollingUpdate.MaxUnavailable != nil { unavailable, err := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxUnavailable, 1, false) @@ -451,12 +451,14 @@ func validateRollingUpdate(rollingUpdate *kops.RollingUpdate, fldpath *field.Pat } } if rollingUpdate.MaxSurge != nil { - surge, err := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxSurge, 1, false) + surge, err := intstr.GetValueFromIntOrPercent(rollingUpdate.MaxSurge, 1000, true) if err != nil { allErrs = append(allErrs, field.Invalid(fldpath.Child("maxSurge"), rollingUpdate.MaxSurge, fmt.Sprintf("Unable to parse: %v", err))) } - if surge < 0 { + if onMasterInstanceGroup && surge != 0 { + allErrs = append(allErrs, field.Forbidden(fldpath.Child("maxSurge"), "Cannot surge instance groups with role \"Master\"")) + } else if surge < 0 { allErrs = append(allErrs, field.Invalid(fldpath.Child("maxSurge"), rollingUpdate.MaxSurge, "Cannot be negative")) } } diff --git a/pkg/apis/kops/validation/validation_test.go b/pkg/apis/kops/validation/validation_test.go index fad36bfbb1..7b525a8fd7 100644 --- a/pkg/apis/kops/validation/validation_test.go +++ b/pkg/apis/kops/validation/validation_test.go @@ -405,6 +405,7 @@ func Test_Validate_Calico(t *testing.T) { func Test_Validate_RollingUpdate(t *testing.T) { grid := []struct { Input kops.RollingUpdate + OnMasterIG bool ExpectedErrors []string }{ { @@ -448,6 +449,16 @@ func Test_Validate_RollingUpdate(t *testing.T) { MaxSurge: intStr(intstr.FromString("0%")), }, }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromInt(1)), + }, + }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromString("1%")), + }, + }, { Input: kops.RollingUpdate{ MaxSurge: intStr(intstr.FromString("nope")), @@ -466,9 +477,56 @@ func Test_Validate_RollingUpdate(t *testing.T) { }, ExpectedErrors: []string{"Invalid value::testField.maxSurge"}, }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromInt(0)), + }, + OnMasterIG: true, + }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromString("0%")), + }, + OnMasterIG: true, + }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromInt(1)), + }, + OnMasterIG: true, + ExpectedErrors: []string{"Forbidden::testField.maxSurge"}, + }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromString("1%")), + }, + OnMasterIG: true, + ExpectedErrors: []string{"Forbidden::testField.maxSurge"}, + }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromString("nope")), + }, + OnMasterIG: true, + ExpectedErrors: []string{"Invalid value::testField.maxSurge"}, + }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromInt(-1)), + }, + OnMasterIG: true, + ExpectedErrors: []string{"Forbidden::testField.maxSurge"}, + }, + { + Input: kops.RollingUpdate{ + MaxSurge: intStr(intstr.FromString("-1%")), + }, + OnMasterIG: true, + ExpectedErrors: []string{"Forbidden::testField.maxSurge"}, + }, } for _, g := range grid { - errs := validateRollingUpdate(&g.Input, field.NewPath("testField")) + errs := validateRollingUpdate(&g.Input, field.NewPath("testField"), g.OnMasterIG) testErrors(t, g.Input, errs, g.ExpectedErrors) } } diff --git a/pkg/instancegroups/instancegroups.go b/pkg/instancegroups/instancegroups.go index e04af77ad3..9194e73a38 100644 --- a/pkg/instancegroups/instancegroups.go +++ b/pkg/instancegroups/instancegroups.go @@ -189,7 +189,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd } // If noneReady, wait until after one node is detached and its replacement validates - // before the detaching more in case the current spec does not result in usable nodes. + // before detaching more in case the current spec does not result in usable nodes. if numSurge == maxSurge || noneReady { // Wait for the minimum interval klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate) From 99100dc4a050f5fddd1e5c7c14091bc6da3ebf6a Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Tue, 3 Mar 2020 20:54:22 -0800 Subject: [PATCH 13/13] Fix flaky test --- pkg/instancegroups/rollingupdate_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/instancegroups/rollingupdate_test.go b/pkg/instancegroups/rollingupdate_test.go index d483877d80..12117e4f5f 100644 --- a/pkg/instancegroups/rollingupdate_test.go +++ b/pkg/instancegroups/rollingupdate_test.go @@ -1091,10 +1091,10 @@ func (t *alreadyDetachedTest) Validate() (*validation.ValidationCluster, error) case 1, 2, 3: assert.Equal(t.t, t.numValidations, len(t.detached), "numnber of detached instances") case 4: - assert.Equal(t.t, 1, t.terminationRequestsLeft, "terminations left") t.mutex.Unlock() - time.Sleep(2 * time.Millisecond) // NodeInterval plus some + time.Sleep(20 * time.Millisecond) // NodeInterval plus some t.mutex.Lock() + assert.Equal(t.t, 1, t.terminationRequestsLeft, "terminations left") case 5: assert.Equal(t.t, 0, t.terminationRequestsLeft, "terminations left") case 6: