mirror of https://github.com/kubernetes/kops.git
				
				
				
			Merge pull request #9637 from hakman/aws-eventual-consistency
Cleanup AWS EC2 eventual consistency warnings
This commit is contained in:
		
						commit
						be783014f2
					
				|  | @ -126,8 +126,8 @@ func DeleteResources(cloud fi.Cloud, resourceMap map[string]*resources.Resource) | ||||||
| 					if err != nil { | 					if err != nil { | ||||||
| 						mutex.Lock() | 						mutex.Lock() | ||||||
| 						if awsresources.IsDependencyViolation(err) { | 						if awsresources.IsDependencyViolation(err) { | ||||||
| 							fmt.Printf("%s\tstill has dependencies, will retry: %v\n", human, err) | 							fmt.Printf("%s\tstill has dependencies, will retry\n", human) | ||||||
| 							klog.V(4).Infof("API call made when had dependency: %s", human) | 							klog.V(4).Infof("resource %q generated a dependency error: %v", human, err) | ||||||
| 						} else { | 						} else { | ||||||
| 							fmt.Printf("%s\terror deleting resources, will retry: %v\n", human, err) | 							fmt.Printf("%s\terror deleting resources, will retry: %v\n", human, err) | ||||||
| 						} | 						} | ||||||
|  |  | ||||||
|  | @ -300,7 +300,13 @@ func (v *AutoscalingGroup) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Autos | ||||||
| 
 | 
 | ||||||
| 		// @step: attempt to create the autoscaling group for us
 | 		// @step: attempt to create the autoscaling group for us
 | ||||||
| 		if _, err := t.Cloud.Autoscaling().CreateAutoScalingGroup(request); err != nil { | 		if _, err := t.Cloud.Autoscaling().CreateAutoScalingGroup(request); err != nil { | ||||||
| 			return fmt.Errorf("error creating AutoscalingGroup: %v", err) | 			code := awsup.AWSErrorCode(err) | ||||||
|  | 			message := awsup.AWSErrorMessage(err) | ||||||
|  | 			if code == "ValidationError" && strings.Contains(message, "Invalid IAM Instance Profile name") { | ||||||
|  | 				klog.V(4).Infof("error creating AutoscalingGroup: %s", message) | ||||||
|  | 				return fi.NewTryAgainLaterError("waiting for the IAM Instance Profile to be propagated") | ||||||
|  | 			} | ||||||
|  | 			return fmt.Errorf("error creating AutoScalingGroup: %s", message) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		// @step: attempt to enable the metrics for us
 | 		// @step: attempt to enable the metrics for us
 | ||||||
|  |  | ||||||
|  | @ -18,7 +18,6 @@ package awstasks | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"time" |  | ||||||
| 
 | 
 | ||||||
| 	"k8s.io/kops/upup/pkg/fi" | 	"k8s.io/kops/upup/pkg/fi" | ||||||
| 	"k8s.io/kops/upup/pkg/fi/cloudup/awsup" | 	"k8s.io/kops/upup/pkg/fi/cloudup/awsup" | ||||||
|  | @ -124,31 +123,6 @@ func (_ *IAMInstanceProfile) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *IAM | ||||||
| 
 | 
 | ||||||
| 		e.ID = response.InstanceProfile.InstanceProfileId | 		e.ID = response.InstanceProfile.InstanceProfileId | ||||||
| 		e.Name = response.InstanceProfile.InstanceProfileName | 		e.Name = response.InstanceProfile.InstanceProfileName | ||||||
| 
 |  | ||||||
| 		// IAM instance profile seems to be highly asynchronous
 |  | ||||||
| 		// and if we don't wait creating dependent resources fail
 |  | ||||||
| 		attempt := 0 |  | ||||||
| 		for { |  | ||||||
| 			if attempt > 10 { |  | ||||||
| 				klog.Warningf("unable to retrieve newly-created IAM instance profile %q; timed out", *e.Name) |  | ||||||
| 				break |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			ip, err := findIAMInstanceProfile(t.Cloud, *e.Name) |  | ||||||
| 			if err != nil { |  | ||||||
| 				klog.Warningf("ignoring error while retrieving newly-created IAM instance profile %q: %v", *e.Name, err) |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			if ip != nil { |  | ||||||
| 				// Found
 |  | ||||||
| 				klog.V(4).Infof("Found IAM instance profile %q", *e.Name) |  | ||||||
| 				break |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			// TODO: Use a real backoff algorithm
 |  | ||||||
| 			time.Sleep(3 * time.Second) |  | ||||||
| 			attempt++ |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// TODO: Should we use path as our tag?
 | 	// TODO: Should we use path as our tag?
 | ||||||
|  |  | ||||||
|  | @ -22,10 +22,8 @@ import ( | ||||||
| 	"math" | 	"math" | ||||||
| 	"sort" | 	"sort" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" |  | ||||||
| 
 | 
 | ||||||
| 	"k8s.io/kops/pkg/apis/kops" | 	"k8s.io/kops/pkg/apis/kops" | ||||||
| 
 |  | ||||||
| 	"k8s.io/kops/pkg/featureflag" | 	"k8s.io/kops/pkg/featureflag" | ||||||
| 	"k8s.io/kops/upup/pkg/fi" | 	"k8s.io/kops/upup/pkg/fi" | ||||||
| 	"k8s.io/kops/upup/pkg/fi/cloudup/awsup" | 	"k8s.io/kops/upup/pkg/fi/cloudup/awsup" | ||||||
|  | @ -354,32 +352,14 @@ func (_ *LaunchConfiguration) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *La | ||||||
| 		request.InstanceMonitoring = &autoscaling.InstanceMonitoring{Enabled: fi.Bool(false)} | 		request.InstanceMonitoring = &autoscaling.InstanceMonitoring{Enabled: fi.Bool(false)} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	attempt := 0 | 	if _, err = t.Cloud.Autoscaling().CreateLaunchConfiguration(request); err != nil { | ||||||
| 	maxAttempts := 10 | 		code := awsup.AWSErrorCode(err) | ||||||
| 	for { |  | ||||||
| 		attempt++ |  | ||||||
| 
 |  | ||||||
| 		klog.V(8).Infof("AWS CreateLaunchConfiguration %s", aws.StringValue(request.LaunchConfigurationName)) |  | ||||||
| 		_, err = t.Cloud.Autoscaling().CreateLaunchConfiguration(request) |  | ||||||
| 		if err == nil { |  | ||||||
| 			break |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		if awsup.AWSErrorCode(err) == "ValidationError" { |  | ||||||
| 		message := awsup.AWSErrorMessage(err) | 		message := awsup.AWSErrorMessage(err) | ||||||
| 			if strings.Contains(message, "not authorized") || strings.Contains(message, "Invalid IamInstance") { | 		if code == "ValidationError" && strings.Contains(message, "Invalid IamInstanceProfile") { | ||||||
| 				if attempt > maxAttempts { | 			klog.V(4).Infof("error creating LaunchConfiguration: %s", message) | ||||||
| 					return fmt.Errorf("IAM instance profile not yet created/propagated (original error: %v)", message) | 			return fi.NewTryAgainLaterError("waiting for the IAM Instance Profile to be propagated") | ||||||
| 		} | 		} | ||||||
| 				klog.V(4).Infof("got an error indicating that the IAM instance profile %q is not ready: %q", fi.StringValue(e.IAMInstanceProfile.Name), message) | 		return fmt.Errorf("error creating LaunchConfiguration: %s", message) | ||||||
| 				klog.Infof("waiting for IAM instance profile %q to be ready", fi.StringValue(e.IAMInstanceProfile.Name)) |  | ||||||
| 				time.Sleep(10 * time.Second) |  | ||||||
| 				continue |  | ||||||
| 			} |  | ||||||
| 			klog.V(4).Infof("ErrorCode=%q, Message=%q", awsup.AWSErrorCode(err), awsup.AWSErrorMessage(err)) |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		return fmt.Errorf("error creating AutoscalingLaunchConfiguration: %v", err) |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	e.ID = fi.String(launchConfigurationName) | 	e.ID = fi.String(launchConfigurationName) | ||||||
|  |  | ||||||
|  | @ -21,7 +21,6 @@ import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"sort" | 	"sort" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" |  | ||||||
| 
 | 
 | ||||||
| 	"k8s.io/kops/upup/pkg/fi" | 	"k8s.io/kops/upup/pkg/fi" | ||||||
| 	"k8s.io/kops/upup/pkg/fi/cloudup/awsup" | 	"k8s.io/kops/upup/pkg/fi/cloudup/awsup" | ||||||
|  | @ -142,31 +141,8 @@ func (t *LaunchTemplate) RenderAWS(c *awsup.AWSAPITarget, a, ep, changes *Launch | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	// @step: attempt to create the launch template
 | 	// @step: attempt to create the launch template
 | ||||||
| 	err = func() error { | 	if _, err = c.Cloud.EC2().CreateLaunchTemplate(input); err != nil { | ||||||
| 		for attempt := 0; attempt < 10; attempt++ { | 		return fmt.Errorf("error creating LaunchTemplate: %v", err) | ||||||
| 			if _, err = c.Cloud.EC2().CreateLaunchTemplate(input); err == nil { |  | ||||||
| 				return nil |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			if awsup.AWSErrorCode(err) == "ValidationError" { |  | ||||||
| 				message := awsup.AWSErrorMessage(err) |  | ||||||
| 				if strings.Contains(message, "not authorized") || strings.Contains(message, "Invalid IamInstance") { |  | ||||||
| 					if attempt > 10 { |  | ||||||
| 						return fmt.Errorf("IAM instance profile not yet created/propagated (original error: %v)", message) |  | ||||||
| 					} |  | ||||||
| 					klog.V(4).Infof("got an error indicating that the IAM instance profile %q is not ready: %q", fi.StringValue(ep.IAMInstanceProfile.Name), message) |  | ||||||
| 
 |  | ||||||
| 					time.Sleep(5 * time.Second) |  | ||||||
| 					continue |  | ||||||
| 				} |  | ||||||
| 				klog.V(4).Infof("ErrorCode=%q, Message=%q", awsup.AWSErrorCode(err), awsup.AWSErrorMessage(err)) |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		return err |  | ||||||
| 	}() |  | ||||||
| 	if err != nil { |  | ||||||
| 		return fmt.Errorf("failed to create aws launch template: %s", err) |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	ep.ID = fi.String(name) | 	ep.ID = fi.String(name) | ||||||
|  |  | ||||||
|  | @ -272,29 +272,6 @@ func (e *NatGateway) Run(c *fi.Context) error { | ||||||
| 	return fi.DefaultDeltaRunMethod(e, c) | 	return fi.DefaultDeltaRunMethod(e, c) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (e *NatGateway) waitAvailable(cloud awsup.AWSCloud) error { |  | ||||||
| 	// It takes 'forever' (up to 5 min...) for a NatGateway to become available after it has been created
 |  | ||||||
| 	// We have to wait until it is actually up
 |  | ||||||
| 
 |  | ||||||
| 	// TODO: Cache availability status
 |  | ||||||
| 
 |  | ||||||
| 	id := aws.StringValue(e.ID) |  | ||||||
| 	if id == "" { |  | ||||||
| 		return fmt.Errorf("NAT Gateway %q did not have ID", aws.StringValue(e.Name)) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	klog.Infof("Waiting for NAT Gateway %q to be available (this often takes about 5 minutes)", id) |  | ||||||
| 	params := &ec2.DescribeNatGatewaysInput{ |  | ||||||
| 		NatGatewayIds: []*string{e.ID}, |  | ||||||
| 	} |  | ||||||
| 	err := cloud.EC2().WaitUntilNatGatewayAvailable(params) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return fmt.Errorf("error waiting for NAT Gateway %q to be available: %v", id, err) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (_ *NatGateway) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *NatGateway) error { | func (_ *NatGateway) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *NatGateway) error { | ||||||
| 	// New NGW
 | 	// New NGW
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -160,10 +160,6 @@ func (_ *Route) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Route) error { | ||||||
| 		} else if e.InternetGateway != nil { | 		} else if e.InternetGateway != nil { | ||||||
| 			request.GatewayId = checkNotNil(e.InternetGateway.ID) | 			request.GatewayId = checkNotNil(e.InternetGateway.ID) | ||||||
| 		} else if e.NatGateway != nil { | 		} else if e.NatGateway != nil { | ||||||
| 			if err := e.NatGateway.waitAvailable(t.Cloud); err != nil { |  | ||||||
| 				return err |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			request.NatGatewayId = checkNotNil(e.NatGateway.ID) | 			request.NatGatewayId = checkNotNil(e.NatGateway.ID) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | @ -175,7 +171,13 @@ func (_ *Route) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Route) error { | ||||||
| 
 | 
 | ||||||
| 		response, err := t.Cloud.EC2().CreateRoute(request) | 		response, err := t.Cloud.EC2().CreateRoute(request) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("error creating Route: %v", err) | 			code := awsup.AWSErrorCode(err) | ||||||
|  | 			message := awsup.AWSErrorMessage(err) | ||||||
|  | 			if code == "InvalidNatGatewayID.NotFound" { | ||||||
|  | 				klog.V(4).Infof("error creating Route: %s", message) | ||||||
|  | 				return fi.NewTryAgainLaterError("waiting for the NAT Gateway to be created") | ||||||
|  | 			} | ||||||
|  | 			return fmt.Errorf("error creating Route: %s", message) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if !aws.BoolValue(response.Return) { | 		if !aws.BoolValue(response.Return) { | ||||||
|  | @ -191,10 +193,6 @@ func (_ *Route) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Route) error { | ||||||
| 		} else if e.InternetGateway != nil { | 		} else if e.InternetGateway != nil { | ||||||
| 			request.GatewayId = checkNotNil(e.InternetGateway.ID) | 			request.GatewayId = checkNotNil(e.InternetGateway.ID) | ||||||
| 		} else if e.NatGateway != nil { | 		} else if e.NatGateway != nil { | ||||||
| 			if err := e.NatGateway.waitAvailable(t.Cloud); err != nil { |  | ||||||
| 				return err |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			request.NatGatewayId = checkNotNil(e.NatGateway.ID) | 			request.NatGatewayId = checkNotNil(e.NatGateway.ID) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | @ -204,9 +202,14 @@ func (_ *Route) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Route) error { | ||||||
| 
 | 
 | ||||||
| 		klog.V(2).Infof("Updating Route with RouteTable:%q CIDR:%q", *e.RouteTable.ID, *e.CIDR) | 		klog.V(2).Infof("Updating Route with RouteTable:%q CIDR:%q", *e.RouteTable.ID, *e.CIDR) | ||||||
| 
 | 
 | ||||||
| 		_, err := t.Cloud.EC2().ReplaceRoute(request) | 		if _, err := t.Cloud.EC2().ReplaceRoute(request); err != nil { | ||||||
| 		if err != nil { | 			code := awsup.AWSErrorCode(err) | ||||||
| 			return fmt.Errorf("error updating Route: %v", err) | 			message := awsup.AWSErrorMessage(err) | ||||||
|  | 			if code == "InvalidNatGatewayID.NotFound" { | ||||||
|  | 				klog.V(4).Infof("error creating Route: %s", message) | ||||||
|  | 				return fi.NewTryAgainLaterError("waiting for the NAT Gateway to be created") | ||||||
|  | 			} | ||||||
|  | 			return fmt.Errorf("error creating Route: %s", message) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -255,3 +255,18 @@ func NewExistsAndWarnIfChangesError(message string) *ExistsAndWarnIfChangesError | ||||||
| 
 | 
 | ||||||
| // ExistsAndWarnIfChangesError implementation of the error interface.
 | // ExistsAndWarnIfChangesError implementation of the error interface.
 | ||||||
| func (e *ExistsAndWarnIfChangesError) Error() string { return e.msg } | func (e *ExistsAndWarnIfChangesError) Error() string { return e.msg } | ||||||
|  | 
 | ||||||
|  | // TryAgainLaterError is the custom used when a task needs to fail validation with a message and try again later
 | ||||||
|  | type TryAgainLaterError struct { | ||||||
|  | 	msg string | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // NewTryAgainLaterError is a builder for TryAgainLaterError.
 | ||||||
|  | func NewTryAgainLaterError(message string) *TryAgainLaterError { | ||||||
|  | 	return &TryAgainLaterError{ | ||||||
|  | 		msg: message, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // TryAgainLaterError implementation of the error interface.
 | ||||||
|  | func (e *TryAgainLaterError) Error() string { return e.msg } | ||||||
|  |  | ||||||
|  | @ -125,7 +125,11 @@ func (e *executor) RunTasks(taskMap map[string]Task) error { | ||||||
| 				} | 				} | ||||||
| 
 | 
 | ||||||
| 				remaining := time.Second * time.Duration(int(time.Until(ts.deadline).Seconds())) | 				remaining := time.Second * time.Duration(int(time.Until(ts.deadline).Seconds())) | ||||||
|  | 				if _, ok := err.(*TryAgainLaterError); ok { | ||||||
|  | 					klog.Infof("Task %q not ready: %v", ts.key, err) | ||||||
|  | 				} else { | ||||||
| 					klog.Warningf("error running task %q (%v remaining to succeed): %v", ts.key, remaining, err) | 					klog.Warningf("error running task %q (%v remaining to succeed): %v", ts.key, remaining, err) | ||||||
|  | 				} | ||||||
| 				errors = append(errors, err) | 				errors = append(errors, err) | ||||||
| 				ts.lastError = err | 				ts.lastError = err | ||||||
| 			} else { | 			} else { | ||||||
|  | @ -140,7 +144,7 @@ func (e *executor) RunTasks(taskMap map[string]Task) error { | ||||||
| 				// Logic error!
 | 				// Logic error!
 | ||||||
| 				panic("did not make progress executing tasks; but no errors reported") | 				panic("did not make progress executing tasks; but no errors reported") | ||||||
| 			} | 			} | ||||||
| 			klog.Infof("No progress made, sleeping before retrying %d failed task(s)", len(errors)) | 			klog.Infof("No progress made, sleeping before retrying %d task(s)", len(errors)) | ||||||
| 			time.Sleep(e.options.WaitAfterAllTasksFailed) | 			time.Sleep(e.options.WaitAfterAllTasksFailed) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue