diff --git a/pkg/instancegroups/BUILD.bazel b/pkg/instancegroups/BUILD.bazel index 51d2bd6009..47ec4bfe3a 100644 --- a/pkg/instancegroups/BUILD.bazel +++ b/pkg/instancegroups/BUILD.bazel @@ -44,6 +44,7 @@ go_test( "//upup/pkg/fi/cloudup/awsup:go_default_library", "//vendor/github.com/aws/aws-sdk-go/aws:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/autoscaling:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/ec2/ec2iface:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", diff --git a/pkg/instancegroups/instancegroups.go b/pkg/instancegroups/instancegroups.go index 26b695f8fc..562cb93729 100644 --- a/pkg/instancegroups/instancegroups.go +++ b/pkg/instancegroups/instancegroups.go @@ -101,10 +101,7 @@ func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting b return stopPrompting, err } -// TODO: Temporarily increase size of ASG? -// TODO: Remove from ASG first so status is immediately updated? - -// RollingUpdate performs a rolling update on a list of ec2 instances. +// RollingUpdate performs a rolling update on a list of instances. func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpdateCluster, cluster *api.Cluster, isBastion bool, sleepAfterTerminate time.Duration, validationTimeout time.Duration) (err error) { // we should not get here, but hey I am going to check. @@ -152,17 +149,59 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd settings := resolveSettings(cluster, r.CloudGroup.InstanceGroup, numInstances) runningDrains := 0 - maxConcurrency := settings.MaxUnavailable.IntValue() + maxSurge := settings.MaxSurge.IntValue() + if maxSurge > len(update) { + maxSurge = len(update) + } + maxConcurrency := maxSurge + settings.MaxUnavailable.IntValue() if maxConcurrency == 0 { klog.Infof("Rolling updates for InstanceGroup %s are disabled", r.CloudGroup.InstanceGroup.Name) return nil } + if r.CloudGroup.InstanceGroup.Spec.Role == api.InstanceGroupRoleMaster && maxSurge != 0 { + // Masters are incapable of surging because they rely on registering themselves through + // the local apiserver. That apiserver depends on the local etcd, which relies on being + // joined to the etcd cluster. + maxSurge = 0 + maxConcurrency = settings.MaxUnavailable.IntValue() + if maxConcurrency == 0 { + maxConcurrency = 1 + } + } + if rollingUpdateData.Interactive { + if maxSurge > 1 { + maxSurge = 1 + } maxConcurrency = 1 } + // TODO sort 'update' to put already detached instances last. + + if maxSurge > 0 && !rollingUpdateData.CloudOnly { + // TODO don't detach instances that are already detached. Handle effects of that on noneReady behavior. + for numSurge := 1; numSurge <= maxSurge; numSurge++ { + if err := r.detachInstance(update[len(update)-numSurge]); err != nil { + return err + } + + // If noneReady, wait until after one node is detached and its replacement validates + // before the detaching more in case the current spec does not result in usable nodes. + if numSurge == maxSurge || (noneReady && numSurge == 1) { + // Wait for the minimum interval + klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate) + time.Sleep(sleepAfterTerminate) + + if err := r.maybeValidate(rollingUpdateData, validationTimeout, "detaching"); err != nil { + return err + } + } + } + noneReady = false + } + terminateChan := make(chan error, maxConcurrency) for uIdx, u := range update { @@ -183,7 +222,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } - err = r.maybeValidate(rollingUpdateData, validationTimeout) + err = r.maybeValidate(rollingUpdateData, validationTimeout, "removing") if err != nil { return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } @@ -229,7 +268,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd } } - err = r.maybeValidate(rollingUpdateData, validationTimeout) + err = r.maybeValidate(rollingUpdateData, validationTimeout, "removing") if err != nil { return err } @@ -359,7 +398,7 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo return nil } -func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpdateCluster, validationTimeout time.Duration) error { +func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpdateCluster, validationTimeout time.Duration, operation string) error { if rollingUpdateData.CloudOnly { klog.Warningf("Not validating cluster as cloudonly flag is set.") @@ -370,10 +409,10 @@ func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpd if rollingUpdateData.FailOnValidate { klog.Errorf("Cluster did not validate within %s", validationTimeout) - return fmt.Errorf("error validating cluster after removing a node: %v", err) + return fmt.Errorf("error validating cluster after %s a node: %v", operation, err) } - klog.Warningf("Cluster validation failed after removing instance, proceeding since fail-on-validate is set to false: %v", err) + klog.Warningf("Cluster validation failed after %s instance, proceeding since fail-on-validate is set to false: %v", operation, err) } } return nil @@ -450,6 +489,30 @@ func (r *RollingUpdateInstanceGroup) validateCluster(rollingUpdateData *RollingU } +// detachInstance detaches a Cloud Instance +func (r *RollingUpdateInstanceGroup) detachInstance(u *cloudinstances.CloudInstanceGroupMember) error { + id := u.ID + nodeName := "" + if u.Node != nil { + nodeName = u.Node.Name + } + if nodeName != "" { + klog.Infof("Detaching instance %q, node %q, in group %q.", id, nodeName, r.CloudGroup.HumanName) + } else { + klog.Infof("Detaching instance %q, in group %q.", id, r.CloudGroup.HumanName) + } + + if err := r.Cloud.DetachInstance(u); err != nil { + if nodeName != "" { + return fmt.Errorf("error detaching instance %q, node %q: %v", id, nodeName, err) + } else { + return fmt.Errorf("error detaching instance %q: %v", id, err) + } + } + + return nil +} + // DeleteInstance deletes an Cloud Instance. func (r *RollingUpdateInstanceGroup) DeleteInstance(u *cloudinstances.CloudInstanceGroupMember) error { id := u.ID diff --git a/pkg/instancegroups/rollingupdate_test.go b/pkg/instancegroups/rollingupdate_test.go index 14c51cb6ee..fd645a37d5 100644 --- a/pkg/instancegroups/rollingupdate_test.go +++ b/pkg/instancegroups/rollingupdate_test.go @@ -25,6 +25,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/stretchr/testify/assert" @@ -112,6 +113,7 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k fakeClient := k8sClient.(*fake.Clientset) groups[name] = &cloudinstances.CloudInstanceGroup{ + HumanName: name, InstanceGroup: &kopsapi.InstanceGroup{ ObjectMeta: v1meta.ObjectMeta{ Name: name, @@ -120,6 +122,7 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k Role: role, }, }, + Raw: &autoscaling.Group{AutoScalingGroupName: aws.String("asg-" + name)}, } cloud.Autoscaling().CreateAutoScalingGroup(&autoscaling.CreateAutoScalingGroupInput{ AutoScalingGroupName: aws.String(name), @@ -139,8 +142,9 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k _ = fakeClient.Tracker().Add(node) } member := cloudinstances.CloudInstanceGroupMember{ - ID: id, - Node: node, + ID: id, + Node: node, + CloudInstanceGroup: groups[name], } if i < needUpdate { groups[name].NeedUpdate = append(groups[name].NeedUpdate, &member) @@ -608,6 +612,52 @@ func TestRollingUpdateTaintAllButOneNeedUpdate(t *testing.T) { assertGroupInstanceCount(t, cloud, "node-1", 1) } +func TestRollingUpdateMaxSurgeIgnoredForMaster(t *testing.T) { + c, cloud, cluster := getTestSetup() + + two := intstr.FromInt(2) + cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ + MaxSurge: &two, + } + + groups := make(map[string]*cloudinstances.CloudInstanceGroup) + makeGroup(groups, c.K8sClient, cloud, "master-1", kopsapi.InstanceGroupRoleMaster, 3, 2) + err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{}) + assert.NoError(t, err, "rolling update") + + cordoned := "" + tainted := map[string]bool{} + deleted := map[string]bool{} + for _, action := range c.K8sClient.(*fake.Clientset).Actions() { + switch a := action.(type) { + case testingclient.PatchAction: + if string(a.GetPatch()) == cordonPatch { + assertCordon(t, a) + assert.Equal(t, "", cordoned, "at most one node cordoned at a time") + assert.True(t, tainted[a.GetName()], "node", a.GetName(), "tainted") + cordoned = a.GetName() + } else { + assertTaint(t, a) + assert.Equal(t, "", cordoned, "not tainting while node cordoned") + assert.False(t, tainted[a.GetName()], "node", a.GetName(), "already tainted") + tainted[a.GetName()] = true + } + case testingclient.DeleteAction: + assert.Equal(t, "nodes", a.GetResource().Resource) + assert.Equal(t, cordoned, a.GetName(), "node was cordoned before delete") + assert.False(t, deleted[a.GetName()], "node", a.GetName(), "already deleted") + deleted[a.GetName()] = true + cordoned = "" + case testingclient.ListAction: + // Don't care + default: + t.Errorf("unexpected action %v", a) + } + } + + assertGroupInstanceCount(t, cloud, "master-1", 1) +} + func TestRollingUpdateDisabled(t *testing.T) { c, cloud, cluster := getTestSetup() @@ -647,12 +697,26 @@ func TestRollingUpdateDisabledCloudonly(t *testing.T) { // The concurrent update tests attempt to induce the following expected update sequence: // -// (Only for "all need update" tests, to verify the toe-dipping behavior) +// (Only for surging "all need update" test, to verify the toe-dipping behavior) +// Request validate (8) --> +// <-- validated +// Detach instance --> +// Request validate (7) --> +// <-- validated +// Detach instance --> +// (end only for surging "all need update" tests) +// (Only for surging "all but one need update" test) +// Request validate (7) --> +// <-- validated +// Detach instance --> +// Detach instance --> +// (end only for surging "all but one need update" test) +// (Only for non-surging "all need update" tests, to verify the toe-dipping behavior) // Request validate (7) --> // <-- validated // Request terminate 1 node (7) --> // <-- 1 node terminated, 6 left -// (end only for "all need update" tests) +// (end only for non-surging "all need update" tests) // Request validate (6) --> // <-- validated // Request terminate 2 nodes (6,5) --> @@ -678,16 +742,24 @@ type concurrentTest struct { ec2iface.EC2API t *testing.T mutex sync.Mutex + surge int terminationRequestsLeft int previousValidation int validationChan chan bool terminationChan chan bool + detached map[string]bool } func (c *concurrentTest) Validate() (*validation.ValidationCluster, error) { c.mutex.Lock() defer c.mutex.Unlock() + if len(c.detached) < c.surge { + assert.Greater(c.t, c.previousValidation, 7, "previous validation") + c.previousValidation-- + return &validation.ValidationCluster{}, nil + } + terminationRequestsLeft := c.terminationRequestsLeft switch terminationRequestsLeft { case 7, 6, 0: @@ -738,7 +810,12 @@ func (c *concurrentTest) TerminateInstances(input *ec2.TerminateInstancesInput) c.mutex.Lock() defer c.mutex.Unlock() - for range input.InstanceIds { + for _, id := range input.InstanceIds { + assert.Equal(c.t, c.surge, len(c.detached), "Number of detached instances") + if c.detached[*id] { + assert.LessOrEqual(c.t, c.terminationRequestsLeft, c.surge, "Deleting detached instances last") + } + terminationRequestsLeft := c.terminationRequestsLeft c.terminationRequestsLeft-- switch terminationRequestsLeft { @@ -776,25 +853,33 @@ func (c *concurrentTest) AssertComplete() { assert.Equal(c.t, 0, c.previousValidation, "last validation") } -func newConcurrentTest(t *testing.T, cloud *awsup.MockAWSCloud, allNeedUpdate bool) *concurrentTest { +func newConcurrentTest(t *testing.T, cloud *awsup.MockAWSCloud, numSurge int, allNeedUpdate bool) *concurrentTest { test := concurrentTest{ EC2API: cloud.MockEC2, t: t, + surge: numSurge, terminationRequestsLeft: 6, validationChan: make(chan bool), terminationChan: make(chan bool), + detached: map[string]bool{}, } - if allNeedUpdate { + if numSurge == 0 && allNeedUpdate { test.terminationRequestsLeft = 7 } - test.previousValidation = test.terminationRequestsLeft + 1 + if numSurge == 0 { + test.previousValidation = test.terminationRequestsLeft + 1 + } else if allNeedUpdate { + test.previousValidation = 9 + } else { + test.previousValidation = 8 + } return &test } func TestRollingUpdateMaxUnavailableAllNeedUpdate(t *testing.T) { c, cloud, cluster := getTestSetup() - concurrentTest := newConcurrentTest(t, cloud, true) + concurrentTest := newConcurrentTest(t, cloud, 0, true) c.ValidateSuccessDuration = 0 c.ClusterValidator = concurrentTest cloud.MockEC2 = concurrentTest @@ -817,7 +902,7 @@ func TestRollingUpdateMaxUnavailableAllNeedUpdate(t *testing.T) { func TestRollingUpdateMaxUnavailableAllButOneNeedUpdate(t *testing.T) { c, cloud, cluster := getTestSetup() - concurrentTest := newConcurrentTest(t, cloud, false) + concurrentTest := newConcurrentTest(t, cloud, 0, false) c.ValidateSuccessDuration = 0 c.ClusterValidator = concurrentTest cloud.MockEC2 = concurrentTest @@ -839,7 +924,7 @@ func TestRollingUpdateMaxUnavailableAllButOneNeedUpdate(t *testing.T) { func TestRollingUpdateMaxUnavailableAllNeedUpdateMaster(t *testing.T) { c, cloud, cluster := getTestSetup() - concurrentTest := newConcurrentTest(t, cloud, true) + concurrentTest := newConcurrentTest(t, cloud, 0, true) c.ValidateSuccessDuration = 0 c.ClusterValidator = concurrentTest cloud.MockEC2 = concurrentTest @@ -859,6 +944,121 @@ func TestRollingUpdateMaxUnavailableAllNeedUpdateMaster(t *testing.T) { concurrentTest.AssertComplete() } +type concurrentTestAutoscaling struct { + autoscalingiface.AutoScalingAPI + ConcurrentTest *concurrentTest +} + +func (m *concurrentTestAutoscaling) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) { + m.ConcurrentTest.mutex.Lock() + defer m.ConcurrentTest.mutex.Unlock() + + assert.Equal(m.ConcurrentTest.t, "node-1", *input.AutoScalingGroupName) + assert.False(m.ConcurrentTest.t, *input.ShouldDecrementDesiredCapacity) + + for _, id := range input.InstanceIds { + assert.Less(m.ConcurrentTest.t, len(m.ConcurrentTest.detached), m.ConcurrentTest.surge, "Number of detached instances") + assert.False(m.ConcurrentTest.t, m.ConcurrentTest.detached[*id], *id+" already detached") + m.ConcurrentTest.detached[*id] = true + } + return &autoscaling.DetachInstancesOutput{}, nil +} + +type ec2IgnoreTags struct { + ec2iface.EC2API +} + +// CreateTags ignores tagging of instances done by the AWS fi.Cloud implementation of DetachInstance() +func (e *ec2IgnoreTags) CreateTags(*ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) { + return &ec2.CreateTagsOutput{}, nil +} + +func TestRollingUpdateMaxSurgeAllNeedUpdate(t *testing.T) { + c, cloud, cluster := getTestSetup() + + concurrentTest := newConcurrentTest(t, cloud, 2, true) + c.ValidateSuccessDuration = 0 + c.ClusterValidator = concurrentTest + cloud.MockAutoscaling = &concurrentTestAutoscaling{ + AutoScalingAPI: cloud.MockAutoscaling, + ConcurrentTest: concurrentTest, + } + cloud.MockEC2 = &ec2IgnoreTags{EC2API: concurrentTest} + + two := intstr.FromInt(2) + cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ + MaxSurge: &two, + } + + groups := make(map[string]*cloudinstances.CloudInstanceGroup) + makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 6, 6) + + err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{}) + assert.NoError(t, err, "rolling update") + + assertGroupInstanceCount(t, cloud, "node-1", 0) + concurrentTest.AssertComplete() +} + +func TestRollingUpdateMaxSurgeAllButOneNeedUpdate(t *testing.T) { + c, cloud, cluster := getTestSetup() + + concurrentTest := newConcurrentTest(t, cloud, 2, false) + c.ValidateSuccessDuration = 0 + c.ClusterValidator = concurrentTest + cloud.MockAutoscaling = &concurrentTestAutoscaling{ + AutoScalingAPI: cloud.MockAutoscaling, + ConcurrentTest: concurrentTest, + } + cloud.MockEC2 = &ec2IgnoreTags{EC2API: concurrentTest} + + two := intstr.FromInt(2) + cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ + MaxSurge: &two, + } + + groups := make(map[string]*cloudinstances.CloudInstanceGroup) + makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 7, 6) + err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{}) + assert.NoError(t, err, "rolling update") + + assertGroupInstanceCount(t, cloud, "node-1", 1) + concurrentTest.AssertComplete() +} + +type countDetach struct { + autoscalingiface.AutoScalingAPI + Count int +} + +func (c *countDetach) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) { + c.Count += len(input.InstanceIds) + return &autoscaling.DetachInstancesOutput{}, nil +} + +func TestRollingUpdateMaxSurgeGreaterThanNeedUpdate(t *testing.T) { + c, cloud, cluster := getTestSetup() + + countDetach := &countDetach{AutoScalingAPI: cloud.MockAutoscaling} + cloud.MockAutoscaling = countDetach + cloud.MockEC2 = &ec2IgnoreTags{EC2API: cloud.MockEC2} + + ten := intstr.FromInt(10) + cluster.Spec.RollingUpdate = &kopsapi.RollingUpdate{ + MaxSurge: &ten, + } + + groups := make(map[string]*cloudinstances.CloudInstanceGroup) + makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 3, 2) + err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{}) + assert.NoError(t, err, "rolling update") + + assertGroupInstanceCount(t, cloud, "node-1", 1) + assert.Equal(t, 2, countDetach.Count) +} + +// TODO tests for surging when instances start off already detached + func assertCordon(t *testing.T, action testingclient.PatchAction) { assert.Equal(t, "nodes", action.GetResource().Resource) assert.Equal(t, cordonPatch, string(action.GetPatch()))