boulder/ratelimits/limiter.go

436 lines
15 KiB
Go

package ratelimits
import (
"context"
"errors"
"fmt"
"math"
"math/rand/v2"
"slices"
"strings"
"time"
"github.com/jmhodges/clock"
"github.com/prometheus/client_golang/prometheus"
berrors "github.com/letsencrypt/boulder/errors"
)
const (
// Allowed is used for rate limit metrics, it's the value of the 'decision'
// label when a request was allowed.
Allowed = "allowed"
// Denied is used for rate limit metrics, it's the value of the 'decision'
// label when a request was denied.
Denied = "denied"
)
// allowedDecision is an "allowed" *Decision that should be returned when a
// checked limit is found to be disabled.
var allowedDecision = &Decision{allowed: true, remaining: math.MaxInt64}
// Limiter provides a high-level interface for rate limiting requests by
// utilizing a token bucket-style approach.
type Limiter struct {
// source is used to store buckets. It must be safe for concurrent use.
source Source
clk clock.Clock
spendLatency *prometheus.HistogramVec
}
// NewLimiter returns a new *Limiter. The provided source must be safe for
// concurrent use.
func NewLimiter(clk clock.Clock, source Source, stats prometheus.Registerer) (*Limiter, error) {
spendLatency := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "ratelimits_spend_latency",
Help: fmt.Sprintf("Latency of ratelimit checks labeled by limit=[name] and decision=[%s|%s], in seconds", Allowed, Denied),
// Exponential buckets ranging from 0.0005s to 3s.
Buckets: prometheus.ExponentialBuckets(0.0005, 3, 8),
}, []string{"limit", "decision"})
stats.MustRegister(spendLatency)
return &Limiter{
source: source,
clk: clk,
spendLatency: spendLatency,
}, nil
}
// Decision represents the result of a rate limit check or spend operation. To
// check the result of a *Decision, call the Result() method.
type Decision struct {
// allowed is true if the bucket possessed enough capacity to allow the
// request given the cost.
allowed bool
// remaining is the number of requests the client is allowed to make before
// they're rate limited.
remaining int64
// retryIn is the duration the client MUST wait before they're allowed to
// make a request.
retryIn time.Duration
// resetIn is the duration the bucket will take to refill to its maximum
// capacity, assuming no further requests are made.
resetIn time.Duration
// newTAT indicates the time at which the bucket will be full. It is the
// theoretical arrival time (TAT) of next request. It must be no more than
// (burst * (period / count)) in the future at any single point in time.
newTAT time.Time
// transaction is the Transaction that resulted in this Decision. It is
// included for the production of verbose Subscriber-facing errors. It is
// set by the Limiter before returning the Decision.
transaction Transaction
}
// Result translates a denied *Decision into a berrors.RateLimitError for the
// Subscriber, or returns nil if the *Decision allows the request. The error
// message includes a human-readable description of the exceeded rate limit and
// a retry-after timestamp.
func (d *Decision) Result(now time.Time) error {
if d.allowed {
return nil
}
// Add 0-3% jitter to the RetryIn duration to prevent thundering herd.
jitter := time.Duration(float64(d.retryIn) * 0.03 * rand.Float64())
retryAfter := d.retryIn + jitter
retryAfterTs := now.UTC().Add(retryAfter).Format("2006-01-02 15:04:05 MST")
// There is no case for FailedAuthorizationsForPausingPerDomainPerAccount
// because the RA will pause clients who exceed that ratelimit.
switch d.transaction.limit.name {
case NewRegistrationsPerIPAddress:
return berrors.RegistrationsPerIPAddressError(
retryAfter,
"too many new registrations (%d) from this IP address in the last %s, retry after %s",
d.transaction.limit.burst,
d.transaction.limit.period.Duration,
retryAfterTs,
)
case NewRegistrationsPerIPv6Range:
return berrors.RegistrationsPerIPv6RangeError(
retryAfter,
"too many new registrations (%d) from this /48 subnet of IPv6 addresses in the last %s, retry after %s",
d.transaction.limit.burst,
d.transaction.limit.period.Duration,
retryAfterTs,
)
case NewOrdersPerAccount:
return berrors.NewOrdersPerAccountError(
retryAfter,
"too many new orders (%d) from this account in the last %s, retry after %s",
d.transaction.limit.burst,
d.transaction.limit.period.Duration,
retryAfterTs,
)
case FailedAuthorizationsPerDomainPerAccount:
// Uses bucket key 'enum:regId:domain'.
idx := strings.LastIndex(d.transaction.bucketKey, ":")
if idx == -1 {
return berrors.InternalServerError("unrecognized bucket key while generating error")
}
domain := d.transaction.bucketKey[idx+1:]
return berrors.FailedAuthorizationsPerDomainPerAccountError(
retryAfter,
"too many failed authorizations (%d) for %q in the last %s, retry after %s",
d.transaction.limit.burst,
domain,
d.transaction.limit.period.Duration,
retryAfterTs,
)
case CertificatesPerDomain, CertificatesPerDomainPerAccount:
// Uses bucket key 'enum:domain' or 'enum:regId:domain' respectively.
idx := strings.LastIndex(d.transaction.bucketKey, ":")
if idx == -1 {
return berrors.InternalServerError("unrecognized bucket key while generating error")
}
domain := d.transaction.bucketKey[idx+1:]
return berrors.CertificatesPerDomainError(
retryAfter,
"too many certificates (%d) already issued for %q in the last %s, retry after %s",
d.transaction.limit.burst,
domain,
d.transaction.limit.period.Duration,
retryAfterTs,
)
case CertificatesPerFQDNSet:
return berrors.CertificatesPerFQDNSetError(
retryAfter,
"too many certificates (%d) already issued for this exact set of domains in the last %s, retry after %s",
d.transaction.limit.burst,
d.transaction.limit.period.Duration,
retryAfterTs,
)
default:
return berrors.InternalServerError("cannot generate error for unknown rate limit")
}
}
// Check DOES NOT deduct the cost of the request from the provided bucket's
// capacity. The returned *Decision indicates whether the capacity exists to
// satisfy the cost and represents the hypothetical state of the bucket IF the
// cost WERE to be deducted. If no bucket exists it will NOT be created. No
// state is persisted to the underlying datastore.
func (l *Limiter) Check(ctx context.Context, txn Transaction) (*Decision, error) {
if txn.allowOnly() {
return allowedDecision, nil
}
// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
tat, err := l.source.Get(ctx, txn.bucketKey)
if err != nil {
if !errors.Is(err, ErrBucketNotFound) {
return nil, err
}
// First request from this client. No need to initialize the bucket
// because this is a check, not a spend. A TAT of "now" is equivalent to
// a full bucket.
return maybeSpend(l.clk, txn, l.clk.Now()), nil
}
return maybeSpend(l.clk, txn, tat), nil
}
// Spend attempts to deduct the cost from the provided bucket's capacity. The
// returned *Decision indicates whether the capacity existed to satisfy the cost
// and represents the current state of the bucket. If no bucket exists it WILL
// be created WITH the cost factored into its initial state. The new bucket
// state is persisted to the underlying datastore, if applicable, before
// returning.
func (l *Limiter) Spend(ctx context.Context, txn Transaction) (*Decision, error) {
return l.BatchSpend(ctx, []Transaction{txn})
}
func prepareBatch(txns []Transaction) ([]Transaction, []string, error) {
var bucketKeys []string
var transactions []Transaction
for _, txn := range txns {
if txn.allowOnly() {
// Ignore allow-only transactions.
continue
}
if slices.Contains(bucketKeys, txn.bucketKey) {
return nil, nil, fmt.Errorf("found duplicate bucket %q in batch", txn.bucketKey)
}
bucketKeys = append(bucketKeys, txn.bucketKey)
transactions = append(transactions, txn)
}
return transactions, bucketKeys, nil
}
func stricter(existing *Decision, incoming *Decision) *Decision {
if existing.retryIn == incoming.retryIn {
if existing.remaining < incoming.remaining {
return existing
}
return incoming
}
if existing.retryIn > incoming.retryIn {
return existing
}
return incoming
}
// BatchSpend attempts to deduct the costs from the provided buckets'
// capacities. If applicable, new bucket states are persisted to the underlying
// datastore before returning. Non-existent buckets will be initialized WITH the
// cost factored into the initial state. The returned *Decision represents the
// strictest of all *Decisions reached in the batch.
func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision, error) {
start := l.clk.Now()
batch, bucketKeys, err := prepareBatch(txns)
if err != nil {
return nil, err
}
if len(batch) == 0 {
// All Transactions were allow-only.
return allowedDecision, nil
}
// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
tats, err := l.source.BatchGet(ctx, bucketKeys)
if err != nil {
return nil, fmt.Errorf("batch get for %d keys: %w", len(bucketKeys), err)
}
batchDecision := allowedDecision
newBuckets := make(map[string]time.Time)
incrBuckets := make(map[string]increment)
staleBuckets := make(map[string]time.Time)
txnOutcomes := make(map[Transaction]string)
for _, txn := range batch {
storedTAT, bucketExists := tats[txn.bucketKey]
d := maybeSpend(l.clk, txn, storedTAT)
if d.allowed && (storedTAT != d.newTAT) && txn.spend {
if !bucketExists {
newBuckets[txn.bucketKey] = d.newTAT
} else if storedTAT.After(l.clk.Now()) {
incrBuckets[txn.bucketKey] = increment{
cost: time.Duration(txn.cost * txn.limit.emissionInterval),
ttl: time.Duration(txn.limit.burstOffset),
}
} else {
staleBuckets[txn.bucketKey] = d.newTAT
}
}
if !txn.spendOnly() {
// Spend-only Transactions are best-effort and do not contribute to
// the batchDecision.
batchDecision = stricter(batchDecision, d)
}
txnOutcomes[txn] = Denied
if d.allowed {
txnOutcomes[txn] = Allowed
}
}
if batchDecision.allowed {
if len(newBuckets) > 0 {
// Use BatchSetNotExisting to create new buckets so that we detect
// if concurrent requests have created this bucket at the same time,
// which would result in overwriting if we used a plain "SET"
// command. If that happens, fall back to incrementing.
alreadyExists, err := l.source.BatchSetNotExisting(ctx, newBuckets)
if err != nil {
return nil, fmt.Errorf("batch set for %d keys: %w", len(newBuckets), err)
}
// Find the original transaction in order to compute the increment
// and set the TTL.
for _, txn := range batch {
if alreadyExists[txn.bucketKey] {
incrBuckets[txn.bucketKey] = increment{
cost: time.Duration(txn.cost * txn.limit.emissionInterval),
ttl: time.Duration(txn.limit.burstOffset),
}
}
}
}
if len(incrBuckets) > 0 {
err = l.source.BatchIncrement(ctx, incrBuckets)
if err != nil {
return nil, fmt.Errorf("batch increment for %d keys: %w", len(incrBuckets), err)
}
}
if len(staleBuckets) > 0 {
// Incrementing a TAT in the past grants unintended burst capacity.
// So instead we overwrite it with a TAT of now + increment. This
// approach may cause a race condition where only the last spend is
// saved, but it's preferable to the alternative.
err = l.source.BatchSet(ctx, staleBuckets)
if err != nil {
return nil, fmt.Errorf("batch set for %d keys: %w", len(staleBuckets), err)
}
}
}
// Observe latency equally across all transactions in the batch.
totalLatency := l.clk.Since(start)
perTxnLatency := totalLatency / time.Duration(len(txnOutcomes))
for txn, outcome := range txnOutcomes {
l.spendLatency.WithLabelValues(txn.limit.name.String(), outcome).Observe(perTxnLatency.Seconds())
}
return batchDecision, nil
}
// Refund attempts to refund all of the cost to the capacity of the specified
// bucket. The returned *Decision indicates whether the refund was successful
// and represents the current state of the bucket. The new bucket state is
// persisted to the underlying datastore, if applicable, before returning. If no
// bucket exists it will NOT be created. Spend-only Transactions are assumed to
// be refundable. Check-only Transactions are never refunded.
//
// Note: The amount refunded cannot cause the bucket to exceed its maximum
// capacity. Partial refunds are allowed and are considered successful. For
// instance, if a bucket has a maximum capacity of 10 and currently has 5
// requests remaining, a refund request of 7 will result in the bucket reaching
// its maximum capacity of 10, not 12.
func (l *Limiter) Refund(ctx context.Context, txn Transaction) (*Decision, error) {
return l.BatchRefund(ctx, []Transaction{txn})
}
// BatchRefund attempts to refund all or some of the costs to the provided
// buckets' capacities. Non-existent buckets will NOT be initialized. The new
// bucket state is persisted to the underlying datastore, if applicable, before
// returning. Spend-only Transactions are assumed to be refundable. Check-only
// Transactions are never refunded. The returned *Decision represents the
// strictest of all *Decisions reached in the batch.
func (l *Limiter) BatchRefund(ctx context.Context, txns []Transaction) (*Decision, error) {
batch, bucketKeys, err := prepareBatch(txns)
if err != nil {
return nil, err
}
if len(batch) == 0 {
// All Transactions were allow-only.
return allowedDecision, nil
}
// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
tats, err := l.source.BatchGet(ctx, bucketKeys)
if err != nil {
return nil, fmt.Errorf("batch get for %d keys: %w", len(bucketKeys), err)
}
batchDecision := allowedDecision
incrBuckets := make(map[string]increment)
for _, txn := range batch {
tat, bucketExists := tats[txn.bucketKey]
if !bucketExists {
// Ignore non-existent bucket.
continue
}
if txn.checkOnly() {
// The cost of check-only transactions are never refunded.
txn.cost = 0
}
d := maybeRefund(l.clk, txn, tat)
batchDecision = stricter(batchDecision, d)
if d.allowed && tat != d.newTAT {
// New bucket state should be persisted.
incrBuckets[txn.bucketKey] = increment{
cost: time.Duration(-txn.cost * txn.limit.emissionInterval),
ttl: time.Duration(txn.limit.burstOffset),
}
}
}
if len(incrBuckets) > 0 {
err = l.source.BatchIncrement(ctx, incrBuckets)
if err != nil {
return nil, fmt.Errorf("batch increment for %d keys: %w", len(incrBuckets), err)
}
}
return batchDecision, nil
}
// Reset resets the specified bucket to its maximum capacity. The new bucket
// state is persisted to the underlying datastore before returning.
func (l *Limiter) Reset(ctx context.Context, bucketKey string) error {
// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
return l.source.Delete(ctx, bucketKey)
}