akamai-purger: Queue and response handling improvements (#5955)

- Make maximum queue size configurable via a new configuration key:
  'MaxQueueSize'.
- Default 'MaxQueueSize' to the previous value (1M) when 'MaxQueueSize'
  isn't specified.
- akamaiPurger.purge() will only place the URLs starting at the first entry of
  the failed batch where a failure was encountered instead of the entire set
  that was originally passed.
  - Add a test to ensure that these changes are working as intended.
- Make the purge batching easier to understand with some minor changes
  to variable names
- Responses whose HTTP status code is not 201 will no longer be unmarshaled
- Logs will explicitly call out if a response indicates that we've exceeded any
  rate limits imposed by Akamai.

Fixes #5917
This commit is contained in:
Samantha 2022-03-07 12:21:16 -08:00 committed by GitHub
parent b19b79162f
commit 3b665f8dbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 111 additions and 48 deletions

View File

@ -31,6 +31,17 @@ const (
v3PurgeTagPath = "/ccu/v3/delete/tag/"
)
var (
// ErrAllRetriesFailed indicates that all purge submission attempts have
// failed.
ErrAllRetriesFailed = errors.New("all attempts to submit purge request failed")
// errFatal is returned by the purge method of CachePurgeClient to indicate
// that it failed for a reason that cannot be remediated by retrying the
// request.
errFatal = errors.New("fatal error")
)
type v3PurgeRequest struct {
Objects []string `json:"objects"`
}
@ -61,13 +72,6 @@ type CachePurgeClient struct {
clk clock.Clock
}
// ErrAllRetriesFailed indicates that all purge submission attempts have failed.
var ErrAllRetriesFailed = errors.New("all attempts to submit purge request failed")
// errFatal is returned by the purge method of CachePurgeClient to indicate that
// it failed for a reason that cannot be remediated by retrying the request.
var errFatal = errors.New("fatal error")
// NewCachePurgeClient performs some basic validation of supplied configuration
// and returns a newly constructed CachePurgeClient.
func NewCachePurgeClient(
@ -228,18 +232,44 @@ func (cpc *CachePurgeClient) authedRequest(endpoint string, body v3PurgeRequest)
return err
}
// Ensure that the purge request was successful.
// Success for a request to purge a URL or Cache tag is 'HTTP 201'.
// https://techdocs.akamai.com/purge-cache/reference/delete-url
// https://techdocs.akamai.com/purge-cache/reference/delete-tag
if resp.StatusCode != http.StatusCreated {
switch resp.StatusCode {
// https://techdocs.akamai.com/purge-cache/reference/403
case http.StatusForbidden:
return fmt.Errorf("client not authorized to make requests for URL %q: %w", resp.Request.URL, errFatal)
// https://techdocs.akamai.com/purge-cache/reference/504
case http.StatusGatewayTimeout:
return fmt.Errorf("server timed out, got HTTP %d (body %q) for URL %q", resp.StatusCode, respBody, resp.Request.URL)
// https://techdocs.akamai.com/purge-cache/reference/429
case http.StatusTooManyRequests:
return fmt.Errorf("exceeded request count rate limit, got HTTP %d (body %q) for URL %q", resp.StatusCode, respBody, resp.Request.URL)
// https://techdocs.akamai.com/purge-cache/reference/413
case http.StatusRequestEntityTooLarge:
return fmt.Errorf("exceeded request size rate limit, got HTTP %d (body %q) for URL %q", resp.StatusCode, respBody, resp.Request.URL)
default:
return fmt.Errorf("received HTTP %d (body %q) for URL %q", resp.StatusCode, respBody, resp.Request.URL)
}
}
var purgeInfo purgeResponse
err = json.Unmarshal(respBody, &purgeInfo)
if err != nil {
return fmt.Errorf("while unmarshalling body %q from URL %q as JSON: %w", respBody, resp.Request.URL, err)
}
if purgeInfo.HTTPStatus != http.StatusCreated || resp.StatusCode != http.StatusCreated {
// Ensure the unmarshaled body concurs with the status of the response
// received.
if purgeInfo.HTTPStatus != http.StatusCreated {
if purgeInfo.HTTPStatus == http.StatusForbidden {
return fmt.Errorf("client not authorized to make requests to URL %q: %w", resp.Request.URL, errFatal)
}
return fmt.Errorf("received HTTP %d (body %q) from URL %q", resp.StatusCode, respBody, resp.Request.URL)
return fmt.Errorf("unmarshaled HTTP %d (body %q) from URL %q", purgeInfo.HTTPStatus, respBody, resp.Request.URL)
}
cpc.log.AuditInfof("Purge request sent successfully (ID %s) (body %s). Purge expected in %ds",
@ -275,21 +305,26 @@ func (cpc *CachePurgeClient) purgeBatch(urls []string) error {
return nil
}
// Purge attempts to send a purge request to the Akamai CCU API cpc.retries
// number of times before giving up and returning ErrAllRetriesFailed.
func (cpc *CachePurgeClient) Purge(urls []string) error {
for i := 0; i < len(urls); {
sliceEnd := i + akamaiBatchSize
if sliceEnd > len(urls) {
sliceEnd = len(urls)
// Purge dispatches the provided urls in batched requests to the Akamai CCU API.
// Requests will be attempted cpc.retries number of times before giving up and
// returning ErrAllRetriesFailed and the beginning index position of the batch
// where the failure was encountered.
func (cpc *CachePurgeClient) Purge(urls []string) (int, error) {
totalURLs := len(urls)
for batchBegin := 0; batchBegin < totalURLs; {
batchEnd := batchBegin + akamaiBatchSize
if batchEnd > totalURLs {
// Avoid index out of range error.
batchEnd = totalURLs
}
err := cpc.purgeBatch(urls[i:sliceEnd])
err := cpc.purgeBatch(urls[batchBegin:batchEnd])
if err != nil {
return err
return batchBegin, err
}
i += akamaiBatchSize
batchBegin += akamaiBatchSize
}
return nil
return totalURLs, nil
}
// CheckSignature is exported for use in tests and akamai-test-srv.

View File

@ -138,18 +138,18 @@ func TestV3Purge(t *testing.T) {
fc := clock.NewFake()
client.clk = fc
err = client.Purge([]string{"http://test.com"})
_, err = client.Purge([]string{"http://test.com"})
test.AssertNotError(t, err, "Purge failed; expected 201 response")
started := client.clk.Now()
as.responseCode = http.StatusInternalServerError
err = client.Purge([]string{"http://test.com"})
_, err = client.Purge([]string{"http://test.com"})
test.AssertError(t, err, "Purge succeeded; expected 500 response")
test.Assert(t, client.clk.Since(started) > (time.Second*4), "Retries should've taken at least 4.4 seconds")
started = client.clk.Now()
as.responseCode = http.StatusCreated
err = client.Purge([]string{"http:/test.com"})
_, err = client.Purge([]string{"http:/test.com"})
test.AssertError(t, err, "Purge succeeded; expected a 403 response from malformed URL")
test.Assert(t, client.clk.Since(started) < time.Second, "Purge should've failed out immediately")
}
@ -249,8 +249,21 @@ func TestBigBatchPurge(t *testing.T) {
urls = append(urls, fmt.Sprintf("http://test.com/%d", i))
}
err = client.Purge(urls)
stoppedAt, err := client.Purge(urls)
test.AssertNotError(t, err, "Purge failed with 201 response")
test.AssertEquals(t, stoppedAt, 250)
// Add a malformed URL.
urls = append(urls, "http:/test.com")
// Add 10 more valid entries.
for i := 0; i < 10; i++ {
urls = append(urls, fmt.Sprintf("http://test.com/%d", i))
}
stoppedAt, err = client.Purge(urls)
test.AssertError(t, err, "Purge succeeded with a malformed URL")
test.AssertEquals(t, stoppedAt, 200)
}
func TestReverseBytes(t *testing.T) {

View File

@ -24,6 +24,9 @@ import (
blog "github.com/letsencrypt/boulder/log"
)
// defaultQueueSize is the default akamai-purger queue size.
const defaultQueueSize = 1000000
type Config struct {
AkamaiPurger struct {
cmd.ServiceConfig
@ -31,6 +34,10 @@ type Config struct {
// PurgeInterval is the duration waited between purge requests.
PurgeInterval cmd.ConfigDuration
// MaxQueueSize is the maximum size of the purger queue. If this value
// isn't provided it will default to `defaultQueueSize`.
MaxQueueSize int
BaseURL string
ClientToken string
ClientSecret string
@ -43,50 +50,53 @@ type Config struct {
Beeline cmd.BeelineConfig
}
// akamaiPurger is a mutex protected container for a gRPC server which receives
// requests to purge the URLs of OCSP responses cached by Akamai, stores these
// URLs in an inner slice, and dispatches them to Akamai's Fast Purge API in
// batches.
type akamaiPurger struct {
sync.Mutex
akamaipb.UnimplementedAkamaiPurgerServer
mu sync.Mutex
toPurge []string
client *akamai.CachePurgeClient
log blog.Logger
toPurge []string
maxQueueSize int
client *akamai.CachePurgeClient
log blog.Logger
}
func (ap *akamaiPurger) len() int {
ap.mu.Lock()
defer ap.mu.Unlock()
ap.Lock()
defer ap.Unlock()
return len(ap.toPurge)
}
func (ap *akamaiPurger) purge() error {
ap.mu.Lock()
ap.Lock()
urls := ap.toPurge[:]
ap.toPurge = []string{}
ap.mu.Unlock()
ap.Unlock()
if len(urls) == 0 {
return nil
}
err := ap.client.Purge(urls)
stoppedAt, err := ap.client.Purge(urls)
if err != nil {
// Add the URLs back to the queue.
ap.mu.Lock()
ap.toPurge = append(urls, ap.toPurge...)
ap.mu.Unlock()
ap.Lock()
// Add the remaining URLs back, but at the end of the queue. If somehow
// there's a URL which repeatedly results in error, it won't block the
// entire queue, only a single batch.
ap.toPurge = append(ap.toPurge, urls[stoppedAt:]...)
ap.Unlock()
ap.log.Errf("Failed to purge %d URLs: %s", len(urls), err)
return err
}
return nil
}
// maxQueueSize is used to reject Purge requests if the queue contains >= the
// number of URLs to purge so that it can catch up.
var maxQueueSize = 1000000
func (ap *akamaiPurger) Purge(ctx context.Context, req *akamaipb.PurgeRequest) (*emptypb.Empty, error) {
ap.mu.Lock()
defer ap.mu.Unlock()
if len(ap.toPurge) >= maxQueueSize {
ap.Lock()
defer ap.Unlock()
if len(ap.toPurge) >= ap.maxQueueSize {
return nil, errors.New("akamai-purger queue too large")
}
ap.toPurge = append(ap.toPurge, req.Urls...)
@ -160,6 +170,10 @@ func main() {
cmd.Fail("'PurgeInterval' must be > 0")
}
if c.AkamaiPurger.MaxQueueSize == 0 {
c.AkamaiPurger.MaxQueueSize = defaultQueueSize
}
ccu, err := akamai.NewCachePurgeClient(
c.AkamaiPurger.BaseURL,
c.AkamaiPurger.ClientToken,
@ -174,8 +188,9 @@ func main() {
cmd.FailOnError(err, "Failed to setup Akamai CCU client")
ap := &akamaiPurger{
client: ccu,
log: logger,
maxQueueSize: c.AkamaiPurger.MaxQueueSize,
client: ccu,
log: logger,
}
var gaugePurgeQueueLength = prometheus.NewGaugeFunc(