diff --git a/upup/pkg/fi/cloudup/awsup/BUILD.bazel b/upup/pkg/fi/cloudup/awsup/BUILD.bazel index 8f5bcda80e..714d91f4b9 100644 --- a/upup/pkg/fi/cloudup/awsup/BUILD.bazel +++ b/upup/pkg/fi/cloudup/awsup/BUILD.bazel @@ -39,6 +39,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/kubernetes/federation/pkg/dnsprovider:go_default_library", "//vendor/k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53:go_default_library", + "//vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/aws:go_default_library", ], ) diff --git a/upup/pkg/fi/cloudup/awsup/aws_cloud.go b/upup/pkg/fi/cloudup/awsup/aws_cloud.go index 1832b93bfd..11bace6607 100644 --- a/upup/pkg/fi/cloudup/awsup/aws_cloud.go +++ b/upup/pkg/fi/cloudup/awsup/aws_cloud.go @@ -19,10 +19,12 @@ package awsup import ( "fmt" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface" @@ -43,6 +45,7 @@ import ( "k8s.io/kops/upup/pkg/fi" "k8s.io/kubernetes/federation/pkg/dnsprovider" dnsproviderroute53 "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53" + k8s_aws "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" ) // By default, aws-sdk-go only retries 3 times, which doesn't give @@ -144,6 +147,13 @@ type awsCloudImplementation struct { region string tags map[string]string + + regionDelayers *RegionDelayers +} + +type RegionDelayers struct { + mutex sync.Mutex + delayerMap map[string]*k8s_aws.CrossRequestRetryDelay } var _ fi.Cloud = &awsCloudImplementation{} @@ -161,16 +171,19 @@ var awsCloudInstances map[string]AWSCloud = make(map[string]AWSCloud) func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) { raw := awsCloudInstances[region] if raw == nil { - c := &awsCloudImplementation{region: region} + c := &awsCloudImplementation{ + region: region, + regionDelayers: &RegionDelayers{ + delayerMap: make(map[string]*k8s_aws.CrossRequestRetryDelay), + }, + } config := aws.NewConfig().WithRegion(region) - // Add some logging of retries - config.Retryer = newLoggingRetryer(ClientMaxRetries) - // This avoids a confusing error message when we fail to get credentials // e.g. https://github.com/kubernetes/kops/issues/605 config = config.WithCredentialsChainVerboseErrors(true) + config = request.WithRetryer(config, newLoggingRetryer(ClientMaxRetries)) requestLogger := newRequestLogger(2) @@ -180,6 +193,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) { } c.cf = cloudformation.New(sess, config) c.cf.Handlers.Send.PushFront(requestLogger) + c.addHandlers(region, &c.cf.Handlers) sess, err = session.NewSession(config) if err != nil { @@ -187,6 +201,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) { } c.ec2 = ec2.New(sess, config) c.ec2.Handlers.Send.PushFront(requestLogger) + c.addHandlers(region, &c.ec2.Handlers) sess, err = session.NewSession(config) if err != nil { @@ -194,6 +209,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) { } c.iam = iam.New(sess, config) c.iam.Handlers.Send.PushFront(requestLogger) + c.addHandlers(region, &c.iam.Handlers) sess, err = session.NewSession(config) if err != nil { @@ -201,6 +217,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) { } c.elb = elb.New(sess, config) c.elb.Handlers.Send.PushFront(requestLogger) + c.addHandlers(region, &c.elb.Handlers) sess, err = session.NewSession(config) if err != nil { @@ -208,6 +225,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) { } c.autoscaling = autoscaling.New(sess, config) c.autoscaling.Handlers.Send.PushFront(requestLogger) + c.addHandlers(region, &c.autoscaling.Handlers) sess, err = session.NewSession(config) if err != nil { @@ -215,6 +233,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) { } c.route53 = route53.New(sess, config) c.route53.Handlers.Send.PushFront(requestLogger) + c.addHandlers(region, &c.route53.Handlers) awsCloudInstances[region] = c raw = c @@ -225,6 +244,43 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) { return i, nil } +func (c *awsCloudImplementation) addHandlers(regionName string, h *request.Handlers) { + + delayer := c.getCrossRequestRetryDelay(regionName) + if delayer != nil { + h.Sign.PushFrontNamed(request.NamedHandler{ + Name: "kops/delay-presign", + Fn: delayer.BeforeSign, + }) + + h.AfterRetry.PushFrontNamed(request.NamedHandler{ + Name: "kops/delay-afterretry", + Fn: delayer.AfterRetry, + }) + } +} + +// Get a CrossRequestRetryDelay, scoped to the region, not to the request. +// This means that when we hit a limit on a call, we will delay _all_ calls to the API. +// We do this to protect the AWS account from becoming overloaded and effectively locked. +// We also log when we hit request limits. +// Note that this delays the current goroutine; this is bad behaviour and will +// likely cause kops to become slow or unresponsive for cloud operations. +// However, this throttle is intended only as a last resort. When we observe +// this throttling, we need to address the root cause (e.g. add a delay to a +// controller retry loop) +func (c *awsCloudImplementation) getCrossRequestRetryDelay(regionName string) *k8s_aws.CrossRequestRetryDelay { + c.regionDelayers.mutex.Lock() + defer c.regionDelayers.mutex.Unlock() + + delayer, found := c.regionDelayers.delayerMap[regionName] + if !found { + delayer = k8s_aws.NewCrossRequestRetryDelay() + c.regionDelayers.delayerMap[regionName] = delayer + } + return delayer +} + func NewEC2Filter(name string, values ...string) *ec2.Filter { awsValues := []*string{} for _, value := range values {