diff --git a/cmd/kops/rollingupdatecluster.go b/cmd/kops/rollingupdatecluster.go index 7c31ad88e0..4894042b43 100644 --- a/cmd/kops/rollingupdatecluster.go +++ b/cmd/kops/rollingupdatecluster.go @@ -341,10 +341,10 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer ClusterName: options.ClusterName, PostDrainDelay: options.PostDrainDelay, ValidationTimeout: options.ValidationTimeout, - ValidateCount: options.ValidateCount, - ValidateSucceeded: 0, + ValidateCount: int(options.ValidateCount), // TODO should we expose this to the UI? - ValidateTickDuration: 30 * time.Second, + ValidateTickDuration: 30 * time.Second, + ValidateSuccessDuration: 10 * time.Second, } err = d.AdjustNeedUpdate(groups, cluster, list) diff --git a/pkg/instancegroups/instancegroups.go b/pkg/instancegroups/instancegroups.go index 6d826f625e..20e88e65ed 100644 --- a/pkg/instancegroups/instancegroups.go +++ b/pkg/instancegroups/instancegroups.go @@ -69,7 +69,7 @@ func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting b } // RollingUpdate performs a rolling update on a list of instances. -func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, cluster *api.Cluster, group *cloudinstances.CloudInstanceGroup, isBastion bool, sleepAfterTerminate time.Duration, validationTimeout time.Duration) (err error) { +func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, cluster *api.Cluster, group *cloudinstances.CloudInstanceGroup, isBastion bool, sleepAfterTerminate time.Duration) (err error) { // Do not need a k8s client if you are doing cloudonly. if c.K8sClient == nil && !c.CloudOnly { return fmt.Errorf("rollingUpdate is missing a k8s client") @@ -156,7 +156,7 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, c klog.Infof("waiting for %v after detaching instance", sleepAfterTerminate) time.Sleep(sleepAfterTerminate) - if err := c.maybeValidate(validationTimeout, "detaching"); err != nil { + if err := c.maybeValidate(c.ValidationTimeout, "detaching"); err != nil { return err } noneReady = false @@ -185,7 +185,7 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, c return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } - err = c.maybeValidate(validationTimeout, "removing") + err = c.maybeValidate(c.ValidationTimeout, "removing") if err != nil { return waitForPendingBeforeReturningError(runningDrains, terminateChan, err) } @@ -231,7 +231,7 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, c } } - err = c.maybeValidate(validationTimeout, "removing") + err = c.maybeValidate(c.ValidationTimeout, "removing") if err != nil { return err } @@ -401,57 +401,64 @@ func (c *RollingUpdateCluster) maybeValidate(validationTimeout time.Duration, op } // validateClusterWithDuration runs validation.ValidateCluster until either we get positive result or the timeout expires -func (c *RollingUpdateCluster) validateClusterWithDuration(duration time.Duration) error { - // Try to validate cluster at least once, this will handle durations that are lower - // than our tick time - if c.tryValidateCluster(duration) { +func (c *RollingUpdateCluster) validateClusterWithDuration(validationTimeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), validationTimeout) + defer cancel() + + if c.tryValidateCluster(ctx) { return nil } - timeout := time.After(duration) - ticker := time.NewTicker(c.ValidateTickDuration) - defer ticker.Stop() - // Keep trying until we're timed out or got a result or got an error - for { - select { - case <-timeout: - // Got a timeout fail with a timeout error - return fmt.Errorf("cluster did not validate within a duration of %q", duration) - case <-ticker.C: - // Got a tick, validate cluster - if c.tryValidateCluster(duration) { - return nil - } - // ValidateCluster didn't work yet, so let's try again - // this will exit up to the for loop - } - } + return fmt.Errorf("cluster did not validate within a duration of %q", validationTimeout) } -func (c *RollingUpdateCluster) tryValidateCluster(duration time.Duration) bool { - result, err := c.ClusterValidator.Validate() - - if err == nil && len(result.Failures) == 0 && c.ValidateCount > 0 { - c.ValidateSucceeded++ - if c.ValidateSucceeded < c.ValidateCount { - klog.Infof("Cluster validated; revalidating %d time(s) to make sure it does not flap.", c.ValidateCount-c.ValidateSucceeded) - return false - } +func (c *RollingUpdateCluster) tryValidateCluster(ctx context.Context) bool { + if c.ValidateCount == 0 { + klog.Warningf("skipping cluster validation because validate-count was 0") + return true } - if err != nil { - klog.Infof("Cluster did not validate, will try again in %q until duration %q expires: %v.", c.ValidateTickDuration, duration, err) - return false - } else if len(result.Failures) > 0 { - messages := []string{} - for _, failure := range result.Failures { - messages = append(messages, failure.Message) + successCount := 0 + + for { + // Note that we validate at least once before checking the timeout, in case the cluster is healthy with a short timeout + result, err := c.ClusterValidator.Validate() + if err == nil && len(result.Failures) == 0 { + successCount++ + if successCount >= c.ValidateCount { + klog.Info("Cluster validated.") + return true + } else { + klog.Infof("Cluster validated; revalidating in %s to make sure it does not flap.", c.ValidateSuccessDuration) + time.Sleep(c.ValidateSuccessDuration) + continue + } } - klog.Infof("Cluster did not pass validation, will try again in %q until duration %q expires: %s.", c.ValidateTickDuration, duration, strings.Join(messages, ", ")) - return false - } else { - klog.Info("Cluster validated.") - return true + + if err != nil { + if ctx.Err() != nil { + klog.Infof("Cluster did not validate within deadline: %v.", err) + return false + } + klog.Infof("Cluster did not validate, will retry in %q: %v.", c.ValidateTickDuration, err) + } else if len(result.Failures) > 0 { + messages := []string{} + for _, failure := range result.Failures { + messages = append(messages, failure.Message) + } + if ctx.Err() != nil { + klog.Infof("Cluster did not pass validation within deadline: %s.", strings.Join(messages, ", ")) + return false + } + klog.Infof("Cluster did not pass validation, will retry in %q: %s.", c.ValidateTickDuration, strings.Join(messages, ", ")) + } + + // Reset the success count; we want N consecutive successful validations + successCount = 0 + + // Wait before retrying + // TODO: Should we check if we have enough time left before the deadline? + time.Sleep(c.ValidateTickDuration) } } diff --git a/pkg/instancegroups/rollingupdate.go b/pkg/instancegroups/rollingupdate.go index 91d2e3e9b6..0ce3b36c4c 100644 --- a/pkg/instancegroups/rollingupdate.go +++ b/pkg/instancegroups/rollingupdate.go @@ -65,11 +65,12 @@ type RollingUpdateCluster struct { // ValidateTickDuration is the amount of time to wait between cluster validation attempts ValidateTickDuration time.Duration - // ValidateCount is the amount of time that a cluster needs to be validated after single node update - ValidateCount int32 + // ValidateSuccessDuration is the amount of time a cluster must continue to validate successfully + // before updating the next node + ValidateSuccessDuration time.Duration - // ValidateSucceeded is the amount of times that a cluster validate is succeeded already - ValidateSucceeded int32 + // ValidateCount is the amount of time that a cluster needs to be validated after single node update + ValidateCount int } // AdjustNeedUpdate adjusts the set of instances that need updating, using factors outside those known by the cloud implementation @@ -136,7 +137,7 @@ func (c *RollingUpdateCluster) RollingUpdate(ctx context.Context, groups map[str defer wg.Done() - err := c.rollingUpdateInstanceGroup(ctx, cluster, group, true, c.BastionInterval, c.ValidationTimeout) + err := c.rollingUpdateInstanceGroup(ctx, cluster, group, true, c.BastionInterval) resultsMutex.Lock() results[k] = err @@ -161,7 +162,7 @@ func (c *RollingUpdateCluster) RollingUpdate(ctx context.Context, groups map[str // and we don't want to roll all the masters at the same time. See issue #284 for _, group := range masterGroups { - err := c.rollingUpdateInstanceGroup(ctx, cluster, group, false, c.MasterInterval, c.ValidationTimeout) + err := c.rollingUpdateInstanceGroup(ctx, cluster, group, false, c.MasterInterval) // Do not continue update if master(s) failed, cluster is potentially in an unhealthy state if err != nil { @@ -183,7 +184,7 @@ func (c *RollingUpdateCluster) RollingUpdate(ctx context.Context, groups map[str } for k, group := range nodeGroups { - err := c.rollingUpdateInstanceGroup(ctx, cluster, group, false, c.NodeInterval, c.ValidationTimeout) + err := c.rollingUpdateInstanceGroup(ctx, cluster, group, false, c.NodeInterval) results[k] = err diff --git a/pkg/instancegroups/rollingupdate_test.go b/pkg/instancegroups/rollingupdate_test.go index 8cc399e3bd..db7f6fd829 100644 --- a/pkg/instancegroups/rollingupdate_test.go +++ b/pkg/instancegroups/rollingupdate_test.go @@ -60,17 +60,17 @@ func getTestSetup() (*RollingUpdateCluster, *awsup.MockAWSCloud, *kopsapi.Cluste cluster.Name = "test.k8s.local" c := &RollingUpdateCluster{ - Cloud: mockcloud, - MasterInterval: 1 * time.Millisecond, - NodeInterval: 1 * time.Millisecond, - BastionInterval: 1 * time.Millisecond, - Force: false, - K8sClient: k8sClient, - ClusterValidator: &successfulClusterValidator{}, - FailOnValidate: true, - ValidateTickDuration: 1 * time.Millisecond, - ValidateCount: 1, - ValidateSucceeded: 0, + Cloud: mockcloud, + MasterInterval: 1 * time.Millisecond, + NodeInterval: 1 * time.Millisecond, + BastionInterval: 1 * time.Millisecond, + Force: false, + K8sClient: k8sClient, + ClusterValidator: &successfulClusterValidator{}, + FailOnValidate: true, + ValidateTickDuration: 1 * time.Millisecond, + ValidateSuccessDuration: 5 * time.Millisecond, + ValidateCount: 2, } return c, mockcloud, cluster @@ -545,7 +545,6 @@ func TestRollingUpdateFlappingValidation(t *testing.T) { ctx := context.Background() c, cloud, cluster := getTestSetup() - c.ValidateCount = 3 // This should only take a few milliseconds, // but we have to pad to allow for random delays (e.g. GC) @@ -1024,6 +1023,7 @@ func TestRollingUpdateMaxUnavailableAllNeedUpdate(t *testing.T) { c, cloud, cluster := getTestSetup() concurrentTest := newConcurrentTest(t, cloud, 0, true) + c.ValidateCount = 1 c.ClusterValidator = concurrentTest cloud.MockEC2 = concurrentTest @@ -1048,6 +1048,7 @@ func TestRollingUpdateMaxUnavailableAllButOneNeedUpdate(t *testing.T) { c, cloud, cluster := getTestSetup() concurrentTest := newConcurrentTest(t, cloud, 0, false) + c.ValidateCount = 1 c.ClusterValidator = concurrentTest cloud.MockEC2 = concurrentTest @@ -1071,6 +1072,7 @@ func TestRollingUpdateMaxUnavailableAllNeedUpdateMaster(t *testing.T) { c, cloud, cluster := getTestSetup() concurrentTest := newConcurrentTest(t, cloud, 0, true) + c.ValidateCount = 1 c.ClusterValidator = concurrentTest cloud.MockEC2 = concurrentTest @@ -1124,6 +1126,7 @@ func TestRollingUpdateMaxSurgeAllNeedUpdate(t *testing.T) { c, cloud, cluster := getTestSetup() concurrentTest := newConcurrentTest(t, cloud, 2, true) + c.ValidateCount = 1 c.ClusterValidator = concurrentTest cloud.MockAutoscaling = &concurrentTestAutoscaling{ AutoScalingAPI: cloud.MockAutoscaling, @@ -1152,6 +1155,7 @@ func TestRollingUpdateMaxSurgeAllButOneNeedUpdate(t *testing.T) { c, cloud, cluster := getTestSetup() concurrentTest := newConcurrentTest(t, cloud, 2, false) + c.ValidateCount = 1 c.ClusterValidator = concurrentTest cloud.MockAutoscaling = &concurrentTestAutoscaling{ AutoScalingAPI: cloud.MockAutoscaling, @@ -1302,6 +1306,7 @@ func TestRollingUpdateMaxSurgeAllNeedUpdateOneAlreadyDetached(t *testing.T) { detached: map[string]bool{}, } + c.ValidateCount = 1 c.ClusterValidator = alreadyDetachedTest cloud.MockAutoscaling = &alreadyDetachedTestAutoscaling{ AutoScalingAPI: cloud.MockAutoscaling, @@ -1332,6 +1337,7 @@ func TestRollingUpdateMaxSurgeAllNeedUpdateMaxAlreadyDetached(t *testing.T) { // Should behave the same as TestRollingUpdateMaxUnavailableAllNeedUpdate concurrentTest := newConcurrentTest(t, cloud, 0, true) + c.ValidateCount = 1 c.ClusterValidator = concurrentTest cloud.MockEC2 = concurrentTest