Purger: compute throughput values from number of instances (#7502)
Give akamai-purger a new "Throughput.TotalInstances" config value, to inform it how many instances of itself are competing for akamai rate limit quote. Combine the `useOptimizedDefaults` and `validate` functions into a single `optimizeAndValidate` function which sets default values according to the number of active instances, and confirms that the results still fall within the rate limits. Fixes https://github.com/letsencrypt/boulder/issues/7487
This commit is contained in:
parent
5be3650e56
commit
0d8efb9b38
|
|
@ -68,34 +68,44 @@ type Throughput struct {
|
||||||
// purge request. One cached OCSP response is composed of 3 URLs totaling <
|
// purge request. One cached OCSP response is composed of 3 URLs totaling <
|
||||||
// 400 bytes. If this value isn't provided it will default to
|
// 400 bytes. If this value isn't provided it will default to
|
||||||
// 'defaultQueueEntriesPerBatch'.
|
// 'defaultQueueEntriesPerBatch'.
|
||||||
QueueEntriesPerBatch int
|
//
|
||||||
|
// Deprecated: Only set TotalInstances and let it compute the defaults.
|
||||||
|
QueueEntriesPerBatch int `validate:"min=0"`
|
||||||
|
|
||||||
// PurgeBatchInterval is the duration waited between dispatching an Akamai
|
// PurgeBatchInterval is the duration waited between dispatching an Akamai
|
||||||
// purge request containing 'QueueEntriesPerBatch' * 3 URLs. If this value
|
// purge request containing 'QueueEntriesPerBatch' * 3 URLs. If this value
|
||||||
// isn't provided it will default to 'defaultPurgeBatchInterval'.
|
// isn't provided it will default to 'defaultPurgeBatchInterval'.
|
||||||
|
//
|
||||||
|
// Deprecated: Only set TotalInstances and let it compute the defaults.
|
||||||
PurgeBatchInterval config.Duration `validate:"-"`
|
PurgeBatchInterval config.Duration `validate:"-"`
|
||||||
|
|
||||||
|
// TotalInstances is the number of akamai-purger instances running at the same
|
||||||
|
// time, across all data centers.
|
||||||
|
TotalInstances int `validate:"min=0"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Throughput) useOptimizedDefaults() {
|
// optimizeAndValidate updates a Throughput struct in-place, replacing any unset
|
||||||
if t.QueueEntriesPerBatch == 0 {
|
// fields with sane defaults and ensuring that the resulting configuration will
|
||||||
|
// not cause us to exceed Akamai's rate limits.
|
||||||
|
func (t *Throughput) optimizeAndValidate() error {
|
||||||
|
// Ideally, this is the only variable actually configured, and we derive
|
||||||
|
// everything else from here. But if it isn't set, assume only 1 is running.
|
||||||
|
if t.TotalInstances < 0 {
|
||||||
|
return errors.New("'totalInstances' must be positive or 0 (for the default)")
|
||||||
|
} else if t.TotalInstances == 0 {
|
||||||
|
t.TotalInstances = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// For the sake of finding a valid throughput solution, we hold the number of
|
||||||
|
// queue entries sent per purge batch constant. We set 2 entries (6 urls) as
|
||||||
|
// the default, and historically we have never had a reason to configure a
|
||||||
|
// different amount. This default ensures we stay well below the maximum
|
||||||
|
// request size of 50,000 bytes per request.
|
||||||
|
if t.QueueEntriesPerBatch < 0 {
|
||||||
|
return errors.New("'queueEntriesPerBatch' must be positive or 0 (for the default)")
|
||||||
|
} else if t.QueueEntriesPerBatch == 0 {
|
||||||
t.QueueEntriesPerBatch = defaultEntriesPerBatch
|
t.QueueEntriesPerBatch = defaultEntriesPerBatch
|
||||||
}
|
}
|
||||||
if t.PurgeBatchInterval.Duration == 0 {
|
|
||||||
t.PurgeBatchInterval.Duration = defaultPurgeBatchInterval
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// validate ensures that the provided throughput configuration will not violate
|
|
||||||
// the Akamai Fast-Purge API limits. For more information see the official
|
|
||||||
// documentation:
|
|
||||||
// https://techdocs.akamai.com/purge-cache/reference/rate-limiting
|
|
||||||
func (t *Throughput) validate() error {
|
|
||||||
if t.PurgeBatchInterval.Duration == 0 {
|
|
||||||
return errors.New("'purgeBatchInterval' must be > 0")
|
|
||||||
}
|
|
||||||
if t.QueueEntriesPerBatch <= 0 {
|
|
||||||
return errors.New("'queueEntriesPerBatch' must be > 0")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send no more than the 50,000 bytes of objects we’re allotted per request.
|
// Send no more than the 50,000 bytes of objects we’re allotted per request.
|
||||||
bytesPerRequest := (t.QueueEntriesPerBatch * akamaiBytesPerResponse)
|
bytesPerRequest := (t.QueueEntriesPerBatch * akamaiBytesPerResponse)
|
||||||
|
|
@ -104,8 +114,21 @@ func (t *Throughput) validate() error {
|
||||||
akamaiBytesPerReqLimit, bytesPerRequest-akamaiBytesPerReqLimit)
|
akamaiBytesPerReqLimit, bytesPerRequest-akamaiBytesPerReqLimit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Now the purge interval must be set such that we exceed neither the 50 API
|
||||||
|
// requests per second limit nor the 200 URLs per second limit across all
|
||||||
|
// concurrent purger instances. We calculated that a value of one request
|
||||||
|
// every 32ms satisfies both constraints with a bit of breathing room (as long
|
||||||
|
// as the number of entries per batch is also at its default). By default we
|
||||||
|
// set this purger's interval to a multiple of 32ms, depending on how many
|
||||||
|
// other purger instances are running.
|
||||||
|
if t.PurgeBatchInterval.Duration < 0 {
|
||||||
|
return errors.New("'purgeBatchInterval' must be positive or 0 (for the default)")
|
||||||
|
} else if t.PurgeBatchInterval.Duration == 0 {
|
||||||
|
t.PurgeBatchInterval.Duration = defaultPurgeBatchInterval * time.Duration(t.TotalInstances)
|
||||||
|
}
|
||||||
|
|
||||||
// Send no more than the 50 API requests we’re allotted each second.
|
// Send no more than the 50 API requests we’re allotted each second.
|
||||||
requestsPerSecond := int(math.Ceil(float64(time.Second) / float64(t.PurgeBatchInterval.Duration)))
|
requestsPerSecond := int(math.Ceil(float64(time.Second)/float64(t.PurgeBatchInterval.Duration))) * t.TotalInstances
|
||||||
if requestsPerSecond > akamaiAPIReqPerSecondLimit {
|
if requestsPerSecond > akamaiAPIReqPerSecondLimit {
|
||||||
return fmt.Errorf("config exceeds Akamai's requests per second limit (%d requests) by %d",
|
return fmt.Errorf("config exceeds Akamai's requests per second limit (%d requests) by %d",
|
||||||
akamaiAPIReqPerSecondLimit, requestsPerSecond-akamaiAPIReqPerSecondLimit)
|
akamaiAPIReqPerSecondLimit, requestsPerSecond-akamaiAPIReqPerSecondLimit)
|
||||||
|
|
@ -117,6 +140,7 @@ func (t *Throughput) validate() error {
|
||||||
return fmt.Errorf("config exceeds Akamai's URLs per second limit (%d URLs) by %d",
|
return fmt.Errorf("config exceeds Akamai's URLs per second limit (%d URLs) by %d",
|
||||||
akamaiURLsPerSecondLimit, urlsPurgedPerSecond-akamaiURLsPerSecondLimit)
|
akamaiURLsPerSecondLimit, urlsPurgedPerSecond-akamaiURLsPerSecondLimit)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -304,11 +328,9 @@ func main() {
|
||||||
defer oTelShutdown(context.Background())
|
defer oTelShutdown(context.Background())
|
||||||
logger.Info(cmd.VersionString())
|
logger.Info(cmd.VersionString())
|
||||||
|
|
||||||
// Unless otherwise specified, use optimized throughput settings.
|
// Use optimized throughput settings for any that are left unspecified.
|
||||||
if (apc.Throughput == Throughput{}) {
|
err = apc.Throughput.optimizeAndValidate()
|
||||||
apc.Throughput.useOptimizedDefaults()
|
cmd.FailOnError(err, "Failed to find valid throughput solution")
|
||||||
}
|
|
||||||
cmd.FailOnError(apc.Throughput.validate(), "")
|
|
||||||
|
|
||||||
if apc.MaxQueueSize == 0 {
|
if apc.MaxQueueSize == 0 {
|
||||||
apc.MaxQueueSize = defaultQueueSize
|
apc.MaxQueueSize = defaultQueueSize
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
akamaipb "github.com/letsencrypt/boulder/akamai/proto"
|
akamaipb "github.com/letsencrypt/boulder/akamai/proto"
|
||||||
|
"github.com/letsencrypt/boulder/config"
|
||||||
blog "github.com/letsencrypt/boulder/log"
|
blog "github.com/letsencrypt/boulder/log"
|
||||||
"github.com/letsencrypt/boulder/test"
|
"github.com/letsencrypt/boulder/test"
|
||||||
)
|
)
|
||||||
|
|
@ -16,55 +17,97 @@ func TestImplementation(t *testing.T) {
|
||||||
test.AssertImplementsGRPCServer(t, &akamaiPurger{}, akamaipb.UnimplementedAkamaiPurgerServer{})
|
test.AssertImplementsGRPCServer(t, &akamaiPurger{}, akamaipb.UnimplementedAkamaiPurgerServer{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestThroughput_validate(t *testing.T) {
|
func TestThroughput_optimizeAndValidate(t *testing.T) {
|
||||||
type fields struct {
|
dur := func(in time.Duration) config.Duration { return config.Duration{Duration: in} }
|
||||||
QueueEntriesPerBatch int
|
|
||||||
PurgeBatchInterval time.Duration
|
|
||||||
}
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
fields fields
|
input Throughput
|
||||||
wantErr bool
|
want Throughput
|
||||||
|
wantErr string
|
||||||
}{
|
}{
|
||||||
{"optimized defaults, should succeed",
|
{
|
||||||
fields{
|
"negative instances",
|
||||||
QueueEntriesPerBatch: defaultEntriesPerBatch,
|
Throughput{defaultEntriesPerBatch, dur(defaultPurgeBatchInterval), -1},
|
||||||
PurgeBatchInterval: defaultPurgeBatchInterval},
|
Throughput{},
|
||||||
false,
|
"must be positive",
|
||||||
},
|
},
|
||||||
{"2ms faster than optimized defaults, should succeed",
|
{
|
||||||
fields{
|
"negative batch interval",
|
||||||
QueueEntriesPerBatch: defaultEntriesPerBatch,
|
Throughput{defaultEntriesPerBatch, config.Duration{Duration: -1}, -1},
|
||||||
PurgeBatchInterval: defaultPurgeBatchInterval + 2*time.Millisecond},
|
Throughput{},
|
||||||
false,
|
"must be positive",
|
||||||
},
|
},
|
||||||
{"exceeds URLs per second by 4 URLs",
|
{
|
||||||
fields{
|
"negative entries per batch",
|
||||||
QueueEntriesPerBatch: defaultEntriesPerBatch,
|
Throughput{-1, dur(defaultPurgeBatchInterval), 1},
|
||||||
PurgeBatchInterval: 29 * time.Millisecond},
|
Throughput{},
|
||||||
true,
|
"must be positive",
|
||||||
},
|
},
|
||||||
{"exceeds bytes per second by 20 bytes",
|
{
|
||||||
fields{
|
"empty input computes sane defaults",
|
||||||
QueueEntriesPerBatch: 125,
|
Throughput{},
|
||||||
PurgeBatchInterval: 1 * time.Second},
|
Throughput{defaultEntriesPerBatch, dur(defaultPurgeBatchInterval), 1},
|
||||||
true,
|
"",
|
||||||
},
|
},
|
||||||
{"exceeds requests per second by 1 request",
|
{
|
||||||
fields{
|
"strict configuration is honored",
|
||||||
QueueEntriesPerBatch: 1,
|
Throughput{2, dur(1 * time.Second), 1},
|
||||||
PurgeBatchInterval: 19999 * time.Microsecond},
|
Throughput{2, dur(1 * time.Second), 1},
|
||||||
true,
|
"",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"slightly looser configuration still within limits",
|
||||||
|
Throughput{defaultEntriesPerBatch, dur(defaultPurgeBatchInterval - time.Millisecond), 1},
|
||||||
|
Throughput{defaultEntriesPerBatch, dur(defaultPurgeBatchInterval - time.Millisecond), 1},
|
||||||
|
"",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"too many requests per second",
|
||||||
|
Throughput{QueueEntriesPerBatch: 1, PurgeBatchInterval: dur(19999 * time.Microsecond)},
|
||||||
|
Throughput{},
|
||||||
|
"requests per second limit",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"too many URLs per second",
|
||||||
|
Throughput{PurgeBatchInterval: dur(29 * time.Millisecond)},
|
||||||
|
Throughput{},
|
||||||
|
"URLs per second limit",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"too many bytes per request",
|
||||||
|
Throughput{QueueEntriesPerBatch: 125, PurgeBatchInterval: dur(1 * time.Second)},
|
||||||
|
Throughput{},
|
||||||
|
"bytes per request limit",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"two instances computes sane defaults",
|
||||||
|
Throughput{TotalInstances: 2},
|
||||||
|
Throughput{defaultEntriesPerBatch, dur(defaultPurgeBatchInterval * 2), 2},
|
||||||
|
"",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"too many requests per second across multiple instances",
|
||||||
|
Throughput{PurgeBatchInterval: dur(defaultPurgeBatchInterval), TotalInstances: 2},
|
||||||
|
Throughput{},
|
||||||
|
"requests per second limit",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"too many entries per second across multiple instances",
|
||||||
|
Throughput{PurgeBatchInterval: dur(59 * time.Millisecond), TotalInstances: 2},
|
||||||
|
Throughput{},
|
||||||
|
"URLs per second limit",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tc := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
tr := &Throughput{
|
err := tc.input.optimizeAndValidate()
|
||||||
QueueEntriesPerBatch: tt.fields.QueueEntriesPerBatch,
|
if tc.wantErr != "" {
|
||||||
}
|
test.AssertError(t, err, "")
|
||||||
tr.PurgeBatchInterval.Duration = tt.fields.PurgeBatchInterval
|
test.AssertContains(t, err.Error(), tc.wantErr)
|
||||||
if err := tr.validate(); (err != nil) != tt.wantErr {
|
} else {
|
||||||
t.Errorf("Throughput.validate() error = %v, wantErr %v", err, tt.wantErr)
|
test.AssertNotError(t, err, "")
|
||||||
|
test.AssertEquals(t, tc.input, tc.want)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,8 +3,7 @@
|
||||||
"purgeRetries": 10,
|
"purgeRetries": 10,
|
||||||
"purgeRetryBackoff": "50ms",
|
"purgeRetryBackoff": "50ms",
|
||||||
"throughput": {
|
"throughput": {
|
||||||
"queueEntriesPerBatch": 2,
|
"totalInstances": 1
|
||||||
"purgeBatchInterval": "32ms"
|
|
||||||
},
|
},
|
||||||
"baseURL": "http://localhost:6789",
|
"baseURL": "http://localhost:6789",
|
||||||
"clientToken": "its-a-token",
|
"clientToken": "its-a-token",
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue