Cleanup AWS EC2 eventual consistency warnings

This commit is contained in:
Ciprian Hacman 2020-07-28 06:16:34 +03:00
parent 85da6b1c85
commit 3a1120724e
8 changed files with 52 additions and 117 deletions

View File

@ -299,7 +299,13 @@ func (v *AutoscalingGroup) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Autos
// @step: attempt to create the autoscaling group for us
if _, err := t.Cloud.Autoscaling().CreateAutoScalingGroup(request); err != nil {
return fmt.Errorf("error creating AutoscalingGroup: %v", err)
code := awsup.AWSErrorCode(err)
message := awsup.AWSErrorMessage(err)
if code == "ValidationError" && strings.Contains(message, "Invalid IAM Instance Profile name") {
klog.V(4).Infof("error creating AutoscalingGroup: %s", message)
return fi.NewTryAgainLaterError("waiting for the IAM Instance Profile to be propagated")
}
return fmt.Errorf("error creating AutoScalingGroup: %s", message)
}
// @step: attempt to enable the metrics for us

View File

@ -18,7 +18,6 @@ package awstasks
import (
"fmt"
"time"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
@ -124,31 +123,6 @@ func (_ *IAMInstanceProfile) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *IAM
e.ID = response.InstanceProfile.InstanceProfileId
e.Name = response.InstanceProfile.InstanceProfileName
// IAM instance profile seems to be highly asynchronous
// and if we don't wait creating dependent resources fail
attempt := 0
for {
if attempt > 10 {
klog.Warningf("unable to retrieve newly-created IAM instance profile %q; timed out", *e.Name)
break
}
ip, err := findIAMInstanceProfile(t.Cloud, *e.Name)
if err != nil {
klog.Warningf("ignoring error while retrieving newly-created IAM instance profile %q: %v", *e.Name, err)
}
if ip != nil {
// Found
klog.V(4).Infof("Found IAM instance profile %q", *e.Name)
break
}
// TODO: Use a real backoff algorithm
time.Sleep(3 * time.Second)
attempt++
}
}
// TODO: Should we use path as our tag?

View File

@ -22,10 +22,8 @@ import (
"math"
"sort"
"strings"
"time"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/featureflag"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
@ -353,32 +351,14 @@ func (_ *LaunchConfiguration) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *La
request.InstanceMonitoring = &autoscaling.InstanceMonitoring{Enabled: fi.Bool(false)}
}
attempt := 0
maxAttempts := 10
for {
attempt++
klog.V(8).Infof("AWS CreateLaunchConfiguration %s", aws.StringValue(request.LaunchConfigurationName))
_, err = t.Cloud.Autoscaling().CreateLaunchConfiguration(request)
if err == nil {
break
if _, err = t.Cloud.Autoscaling().CreateLaunchConfiguration(request); err != nil {
code := awsup.AWSErrorCode(err)
message := awsup.AWSErrorMessage(err)
if code == "ValidationError" && strings.Contains(message, "Invalid IamInstanceProfile") {
klog.V(4).Infof("error creating LaunchConfiguration: %s", message)
return fi.NewTryAgainLaterError("waiting for the IAM Instance Profile to be propagated")
}
if awsup.AWSErrorCode(err) == "ValidationError" {
message := awsup.AWSErrorMessage(err)
if strings.Contains(message, "not authorized") || strings.Contains(message, "Invalid IamInstance") {
if attempt > maxAttempts {
return fmt.Errorf("IAM instance profile not yet created/propagated (original error: %v)", message)
}
klog.V(4).Infof("got an error indicating that the IAM instance profile %q is not ready: %q", fi.StringValue(e.IAMInstanceProfile.Name), message)
klog.Infof("waiting for IAM instance profile %q to be ready", fi.StringValue(e.IAMInstanceProfile.Name))
time.Sleep(10 * time.Second)
continue
}
klog.V(4).Infof("ErrorCode=%q, Message=%q", awsup.AWSErrorCode(err), awsup.AWSErrorMessage(err))
}
return fmt.Errorf("error creating AutoscalingLaunchConfiguration: %v", err)
return fmt.Errorf("error creating LaunchConfiguration: %s", message)
}
e.ID = fi.String(launchConfigurationName)

View File

@ -21,7 +21,6 @@ import (
"fmt"
"sort"
"strings"
"time"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
@ -142,31 +141,8 @@ func (t *LaunchTemplate) RenderAWS(c *awsup.AWSAPITarget, a, ep, changes *Launch
}
}
// @step: attempt to create the launch template
err = func() error {
for attempt := 0; attempt < 10; attempt++ {
if _, err = c.Cloud.EC2().CreateLaunchTemplate(input); err == nil {
return nil
}
if awsup.AWSErrorCode(err) == "ValidationError" {
message := awsup.AWSErrorMessage(err)
if strings.Contains(message, "not authorized") || strings.Contains(message, "Invalid IamInstance") {
if attempt > 10 {
return fmt.Errorf("IAM instance profile not yet created/propagated (original error: %v)", message)
}
klog.V(4).Infof("got an error indicating that the IAM instance profile %q is not ready: %q", fi.StringValue(ep.IAMInstanceProfile.Name), message)
time.Sleep(5 * time.Second)
continue
}
klog.V(4).Infof("ErrorCode=%q, Message=%q", awsup.AWSErrorCode(err), awsup.AWSErrorMessage(err))
}
}
return err
}()
if err != nil {
return fmt.Errorf("failed to create aws launch template: %s", err)
if _, err = c.Cloud.EC2().CreateLaunchTemplate(input); err != nil {
return fmt.Errorf("error creating LaunchTemplate: %v", err)
}
ep.ID = fi.String(name)

View File

@ -272,29 +272,6 @@ func (e *NatGateway) Run(c *fi.Context) error {
return fi.DefaultDeltaRunMethod(e, c)
}
func (e *NatGateway) waitAvailable(cloud awsup.AWSCloud) error {
// It takes 'forever' (up to 5 min...) for a NatGateway to become available after it has been created
// We have to wait until it is actually up
// TODO: Cache availability status
id := aws.StringValue(e.ID)
if id == "" {
return fmt.Errorf("NAT Gateway %q did not have ID", aws.StringValue(e.Name))
}
klog.Infof("Waiting for NAT Gateway %q to be available (this often takes about 5 minutes)", id)
params := &ec2.DescribeNatGatewaysInput{
NatGatewayIds: []*string{e.ID},
}
err := cloud.EC2().WaitUntilNatGatewayAvailable(params)
if err != nil {
return fmt.Errorf("error waiting for NAT Gateway %q to be available: %v", id, err)
}
return nil
}
func (_ *NatGateway) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *NatGateway) error {
// New NGW

View File

@ -160,10 +160,6 @@ func (_ *Route) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Route) error {
} else if e.InternetGateway != nil {
request.GatewayId = checkNotNil(e.InternetGateway.ID)
} else if e.NatGateway != nil {
if err := e.NatGateway.waitAvailable(t.Cloud); err != nil {
return err
}
request.NatGatewayId = checkNotNil(e.NatGateway.ID)
}
@ -175,7 +171,13 @@ func (_ *Route) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Route) error {
response, err := t.Cloud.EC2().CreateRoute(request)
if err != nil {
return fmt.Errorf("error creating Route: %v", err)
code := awsup.AWSErrorCode(err)
message := awsup.AWSErrorMessage(err)
if code == "InvalidNatGatewayID.NotFound" {
klog.V(4).Infof("error creating Route: %s", message)
return fi.NewTryAgainLaterError("waiting for the NAT Gateway to be created")
}
return fmt.Errorf("error creating Route: %s", message)
}
if !aws.BoolValue(response.Return) {
@ -191,10 +193,6 @@ func (_ *Route) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Route) error {
} else if e.InternetGateway != nil {
request.GatewayId = checkNotNil(e.InternetGateway.ID)
} else if e.NatGateway != nil {
if err := e.NatGateway.waitAvailable(t.Cloud); err != nil {
return err
}
request.NatGatewayId = checkNotNil(e.NatGateway.ID)
}
@ -204,9 +202,14 @@ func (_ *Route) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Route) error {
klog.V(2).Infof("Updating Route with RouteTable:%q CIDR:%q", *e.RouteTable.ID, *e.CIDR)
_, err := t.Cloud.EC2().ReplaceRoute(request)
if err != nil {
return fmt.Errorf("error updating Route: %v", err)
if _, err := t.Cloud.EC2().ReplaceRoute(request); err != nil {
code := awsup.AWSErrorCode(err)
message := awsup.AWSErrorMessage(err)
if code == "InvalidNatGatewayID.NotFound" {
klog.V(4).Infof("error creating Route: %s", message)
return fi.NewTryAgainLaterError("waiting for the NAT Gateway to be created")
}
return fmt.Errorf("error creating Route: %s", message)
}
}

View File

@ -255,3 +255,18 @@ func NewExistsAndWarnIfChangesError(message string) *ExistsAndWarnIfChangesError
// ExistsAndWarnIfChangesError implementation of the error interface.
func (e *ExistsAndWarnIfChangesError) Error() string { return e.msg }
// TryAgainLaterError is the custom used when a task needs to fail validation with a message and try again later
type TryAgainLaterError struct {
msg string
}
// NewTryAgainLaterError is a builder for TryAgainLaterError.
func NewTryAgainLaterError(message string) *TryAgainLaterError {
return &TryAgainLaterError{
msg: message,
}
}
// TryAgainLaterError implementation of the error interface.
func (e *TryAgainLaterError) Error() string { return e.msg }

View File

@ -125,7 +125,11 @@ func (e *executor) RunTasks(taskMap map[string]Task) error {
}
remaining := time.Second * time.Duration(int(time.Until(ts.deadline).Seconds()))
klog.Warningf("error running task %q (%v remaining to succeed): %v", ts.key, remaining, err)
if _, ok := err.(*TryAgainLaterError); ok {
klog.Infof("Task %q not ready: %v", ts.key, err)
} else {
klog.Warningf("error running task %q (%v remaining to succeed): %v", ts.key, remaining, err)
}
errors = append(errors, err)
ts.lastError = err
} else {
@ -140,7 +144,7 @@ func (e *executor) RunTasks(taskMap map[string]Task) error {
// Logic error!
panic("did not make progress executing tasks; but no errors reported")
}
klog.Infof("No progress made, sleeping before retrying %d failed task(s)", len(errors))
klog.Infof("No progress made, sleeping before retrying %d task(s)", len(errors))
time.Sleep(e.options.WaitAfterAllTasksFailed)
}
}