From d56ad413343d26f0047bb908eb72424b6c91fcfe Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 26 Jan 2020 10:46:03 -0800 Subject: [PATCH] Address review comments --- pkg/instancegroups/instancegroups.go | 51 ++++++++++++++-------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/pkg/instancegroups/instancegroups.go b/pkg/instancegroups/instancegroups.go index ae9b1317e7..26b695f8fc 100644 --- a/pkg/instancegroups/instancegroups.go +++ b/pkg/instancegroups/instancegroups.go @@ -151,7 +151,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd settings := resolveSettings(cluster, r.CloudGroup.InstanceGroup, numInstances) - concurrency := 0 + runningDrains := 0 maxConcurrency := settings.MaxUnavailable.IntValue() if maxConcurrency == 0 { @@ -166,24 +166,26 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd terminateChan := make(chan error, maxConcurrency) for uIdx, u := range update { - go r.drainTerminateAndWait(u, rollingUpdateData, terminateChan, isBastion, sleepAfterTerminate) - concurrency++ + go func(m *cloudinstances.CloudInstanceGroupMember) { + terminateChan <- r.drainTerminateAndWait(m, rollingUpdateData, isBastion, sleepAfterTerminate) + }(u) + runningDrains++ // Wait until after one node is deleted and its replacement validates before the concurrent draining // in case the current spec does not result in usable nodes. - if concurrency < maxConcurrency && (!noneReady || uIdx > 0) { + if runningDrains < maxConcurrency && (!noneReady || uIdx > 0) { continue } err = <-terminateChan - concurrency-- + runningDrains-- if err != nil { - return waitForPendingBeforeReturningError(concurrency, terminateChan, err) + return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } err = r.maybeValidate(rollingUpdateData, validationTimeout) if err != nil { - return waitForPendingBeforeReturningError(concurrency, terminateChan, err) + return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } if rollingUpdateData.Interactive { @@ -202,13 +204,15 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd } } + // Validation tends to return failures from the start of drain until the replacement is + // fully ready, so sweep up as many completions as we can before starting the next drain. sweep: - for concurrency > 0 { + for runningDrains > 0 { select { case err = <-terminateChan: - concurrency-- + runningDrains-- if err != nil { - return waitForPendingBeforeReturningError(concurrency, terminateChan, err) + return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } default: break sweep @@ -216,12 +220,12 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd } } - if concurrency > 0 { - for concurrency > 0 { + if runningDrains > 0 { + for runningDrains > 0 { err = <-terminateChan - concurrency-- + runningDrains-- if err != nil { - return waitForPendingBeforeReturningError(concurrency, terminateChan, err) + return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } } @@ -234,10 +238,10 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd return nil } -func waitForPendingBeforeReturningError(concurrency int, terminateChan chan error, err error) error { - for concurrency > 0 { +func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan error, err error) error { + for runningDrains > 0 { <-terminateChan - concurrency-- + runningDrains-- } return err } @@ -300,7 +304,7 @@ func (r *RollingUpdateInstanceGroup) patchTaint(rollingUpdateData *RollingUpdate return err } -func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster, terminateChan chan error, isBastion bool, sleepAfterTerminate time.Duration) { +func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster, isBastion bool, sleepAfterTerminate time.Duration) error { instanceId := u.ID nodeName := "" @@ -321,8 +325,7 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo if err := r.DrainNode(u, rollingUpdateData); err != nil { if rollingUpdateData.FailOnDrainError { - terminateChan <- fmt.Errorf("failed to drain node %q: %v", nodeName, err) - return + return fmt.Errorf("failed to drain node %q: %v", nodeName, err) } klog.Infof("Ignoring error draining node %q: %v", nodeName, err) } @@ -339,23 +342,21 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo } else { klog.Infof("deleting node %q from kubernetes", nodeName) if err := r.deleteNode(u.Node, rollingUpdateData); err != nil { - terminateChan <- fmt.Errorf("error deleting node %q: %v", nodeName, err) - return + return fmt.Errorf("error deleting node %q: %v", nodeName, err) } } } if err := r.DeleteInstance(u); err != nil { klog.Errorf("error deleting instance %q, node %q: %v", instanceId, nodeName, err) - terminateChan <- err - return + return err } // Wait for the minimum interval klog.Infof("waiting for %v after terminating instance", sleepAfterTerminate) time.Sleep(sleepAfterTerminate) - terminateChan <- nil + return nil } func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpdateCluster, validationTimeout time.Duration) error {