mirror of https://github.com/kubernetes/kops.git
adding kubernetes core regional global rate limiter that spans all calls
This commit is contained in:
parent
f3454f95d7
commit
6dc953c3d4
|
|
@ -39,6 +39,7 @@ go_library(
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//vendor/k8s.io/kubernetes/federation/pkg/dnsprovider:go_default_library",
|
"//vendor/k8s.io/kubernetes/federation/pkg/dnsprovider:go_default_library",
|
||||||
"//vendor/k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53:go_default_library",
|
"//vendor/k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53:go_default_library",
|
||||||
|
"//vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/aws:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,10 +19,12 @@ package awsup
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/request"
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/aws/aws-sdk-go/service/autoscaling"
|
"github.com/aws/aws-sdk-go/service/autoscaling"
|
||||||
"github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
|
"github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
|
||||||
|
|
@ -43,6 +45,7 @@ import (
|
||||||
"k8s.io/kops/upup/pkg/fi"
|
"k8s.io/kops/upup/pkg/fi"
|
||||||
"k8s.io/kubernetes/federation/pkg/dnsprovider"
|
"k8s.io/kubernetes/federation/pkg/dnsprovider"
|
||||||
dnsproviderroute53 "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53"
|
dnsproviderroute53 "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53"
|
||||||
|
k8s_aws "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
|
||||||
)
|
)
|
||||||
|
|
||||||
// By default, aws-sdk-go only retries 3 times, which doesn't give
|
// By default, aws-sdk-go only retries 3 times, which doesn't give
|
||||||
|
|
@ -144,6 +147,13 @@ type awsCloudImplementation struct {
|
||||||
region string
|
region string
|
||||||
|
|
||||||
tags map[string]string
|
tags map[string]string
|
||||||
|
|
||||||
|
regionDelayers *RegionDelayers
|
||||||
|
}
|
||||||
|
|
||||||
|
type RegionDelayers struct {
|
||||||
|
mutex sync.Mutex
|
||||||
|
delayerMap map[string]*k8s_aws.CrossRequestRetryDelay
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ fi.Cloud = &awsCloudImplementation{}
|
var _ fi.Cloud = &awsCloudImplementation{}
|
||||||
|
|
@ -161,16 +171,19 @@ var awsCloudInstances map[string]AWSCloud = make(map[string]AWSCloud)
|
||||||
func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
|
func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
|
||||||
raw := awsCloudInstances[region]
|
raw := awsCloudInstances[region]
|
||||||
if raw == nil {
|
if raw == nil {
|
||||||
c := &awsCloudImplementation{region: region}
|
c := &awsCloudImplementation{
|
||||||
|
region: region,
|
||||||
|
regionDelayers: &RegionDelayers{
|
||||||
|
delayerMap: make(map[string]*k8s_aws.CrossRequestRetryDelay),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
config := aws.NewConfig().WithRegion(region)
|
config := aws.NewConfig().WithRegion(region)
|
||||||
|
|
||||||
// Add some logging of retries
|
|
||||||
config.Retryer = newLoggingRetryer(ClientMaxRetries)
|
|
||||||
|
|
||||||
// This avoids a confusing error message when we fail to get credentials
|
// This avoids a confusing error message when we fail to get credentials
|
||||||
// e.g. https://github.com/kubernetes/kops/issues/605
|
// e.g. https://github.com/kubernetes/kops/issues/605
|
||||||
config = config.WithCredentialsChainVerboseErrors(true)
|
config = config.WithCredentialsChainVerboseErrors(true)
|
||||||
|
config = request.WithRetryer(config, newLoggingRetryer(ClientMaxRetries))
|
||||||
|
|
||||||
requestLogger := newRequestLogger(2)
|
requestLogger := newRequestLogger(2)
|
||||||
|
|
||||||
|
|
@ -180,6 +193,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
|
||||||
}
|
}
|
||||||
c.cf = cloudformation.New(sess, config)
|
c.cf = cloudformation.New(sess, config)
|
||||||
c.cf.Handlers.Send.PushFront(requestLogger)
|
c.cf.Handlers.Send.PushFront(requestLogger)
|
||||||
|
c.addHandlers(region, &c.cf.Handlers)
|
||||||
|
|
||||||
sess, err = session.NewSession(config)
|
sess, err = session.NewSession(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -187,6 +201,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
|
||||||
}
|
}
|
||||||
c.ec2 = ec2.New(sess, config)
|
c.ec2 = ec2.New(sess, config)
|
||||||
c.ec2.Handlers.Send.PushFront(requestLogger)
|
c.ec2.Handlers.Send.PushFront(requestLogger)
|
||||||
|
c.addHandlers(region, &c.ec2.Handlers)
|
||||||
|
|
||||||
sess, err = session.NewSession(config)
|
sess, err = session.NewSession(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -194,6 +209,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
|
||||||
}
|
}
|
||||||
c.iam = iam.New(sess, config)
|
c.iam = iam.New(sess, config)
|
||||||
c.iam.Handlers.Send.PushFront(requestLogger)
|
c.iam.Handlers.Send.PushFront(requestLogger)
|
||||||
|
c.addHandlers(region, &c.iam.Handlers)
|
||||||
|
|
||||||
sess, err = session.NewSession(config)
|
sess, err = session.NewSession(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -201,6 +217,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
|
||||||
}
|
}
|
||||||
c.elb = elb.New(sess, config)
|
c.elb = elb.New(sess, config)
|
||||||
c.elb.Handlers.Send.PushFront(requestLogger)
|
c.elb.Handlers.Send.PushFront(requestLogger)
|
||||||
|
c.addHandlers(region, &c.elb.Handlers)
|
||||||
|
|
||||||
sess, err = session.NewSession(config)
|
sess, err = session.NewSession(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -208,6 +225,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
|
||||||
}
|
}
|
||||||
c.autoscaling = autoscaling.New(sess, config)
|
c.autoscaling = autoscaling.New(sess, config)
|
||||||
c.autoscaling.Handlers.Send.PushFront(requestLogger)
|
c.autoscaling.Handlers.Send.PushFront(requestLogger)
|
||||||
|
c.addHandlers(region, &c.autoscaling.Handlers)
|
||||||
|
|
||||||
sess, err = session.NewSession(config)
|
sess, err = session.NewSession(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -215,6 +233,7 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
|
||||||
}
|
}
|
||||||
c.route53 = route53.New(sess, config)
|
c.route53 = route53.New(sess, config)
|
||||||
c.route53.Handlers.Send.PushFront(requestLogger)
|
c.route53.Handlers.Send.PushFront(requestLogger)
|
||||||
|
c.addHandlers(region, &c.route53.Handlers)
|
||||||
|
|
||||||
awsCloudInstances[region] = c
|
awsCloudInstances[region] = c
|
||||||
raw = c
|
raw = c
|
||||||
|
|
@ -225,6 +244,43 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
|
||||||
return i, nil
|
return i, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *awsCloudImplementation) addHandlers(regionName string, h *request.Handlers) {
|
||||||
|
|
||||||
|
delayer := c.getCrossRequestRetryDelay(regionName)
|
||||||
|
if delayer != nil {
|
||||||
|
h.Sign.PushFrontNamed(request.NamedHandler{
|
||||||
|
Name: "kops/delay-presign",
|
||||||
|
Fn: delayer.BeforeSign,
|
||||||
|
})
|
||||||
|
|
||||||
|
h.AfterRetry.PushFrontNamed(request.NamedHandler{
|
||||||
|
Name: "kops/delay-afterretry",
|
||||||
|
Fn: delayer.AfterRetry,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get a CrossRequestRetryDelay, scoped to the region, not to the request.
|
||||||
|
// This means that when we hit a limit on a call, we will delay _all_ calls to the API.
|
||||||
|
// We do this to protect the AWS account from becoming overloaded and effectively locked.
|
||||||
|
// We also log when we hit request limits.
|
||||||
|
// Note that this delays the current goroutine; this is bad behaviour and will
|
||||||
|
// likely cause kops to become slow or unresponsive for cloud operations.
|
||||||
|
// However, this throttle is intended only as a last resort. When we observe
|
||||||
|
// this throttling, we need to address the root cause (e.g. add a delay to a
|
||||||
|
// controller retry loop)
|
||||||
|
func (c *awsCloudImplementation) getCrossRequestRetryDelay(regionName string) *k8s_aws.CrossRequestRetryDelay {
|
||||||
|
c.regionDelayers.mutex.Lock()
|
||||||
|
defer c.regionDelayers.mutex.Unlock()
|
||||||
|
|
||||||
|
delayer, found := c.regionDelayers.delayerMap[regionName]
|
||||||
|
if !found {
|
||||||
|
delayer = k8s_aws.NewCrossRequestRetryDelay()
|
||||||
|
c.regionDelayers.delayerMap[regionName] = delayer
|
||||||
|
}
|
||||||
|
return delayer
|
||||||
|
}
|
||||||
|
|
||||||
func NewEC2Filter(name string, values ...string) *ec2.Filter {
|
func NewEC2Filter(name string, values ...string) *ec2.Filter {
|
||||||
awsValues := []*string{}
|
awsValues := []*string{}
|
||||||
for _, value := range values {
|
for _, value := range values {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue