309 lines
10 KiB
Go
309 lines
10 KiB
Go
package ratelimits
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/jmhodges/clock"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
const (
|
|
// Allowed is used for rate limit metrics, it's the value of the 'decision'
|
|
// label when a request was allowed.
|
|
Allowed = "allowed"
|
|
|
|
// Denied is used for rate limit metrics, it's the value of the 'decision'
|
|
// label when a request was denied.
|
|
Denied = "denied"
|
|
)
|
|
|
|
// allowedDecision is an "allowed" *Decision that should be returned when a
|
|
// checked limit is found to be disabled.
|
|
var allowedDecision = &Decision{Allowed: true, Remaining: math.MaxInt64}
|
|
|
|
// Limiter provides a high-level interface for rate limiting requests by
|
|
// utilizing a leaky bucket-style approach.
|
|
type Limiter struct {
|
|
// source is used to store buckets. It must be safe for concurrent use.
|
|
source source
|
|
clk clock.Clock
|
|
|
|
spendLatency *prometheus.HistogramVec
|
|
overrideUsageGauge *prometheus.GaugeVec
|
|
}
|
|
|
|
// NewLimiter returns a new *Limiter. The provided source must be safe for
|
|
// concurrent use.
|
|
func NewLimiter(clk clock.Clock, source source, stats prometheus.Registerer) (*Limiter, error) {
|
|
limiter := &Limiter{source: source, clk: clk}
|
|
limiter.spendLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
|
Name: "ratelimits_spend_latency",
|
|
Help: fmt.Sprintf("Latency of ratelimit checks labeled by limit=[name] and decision=[%s|%s], in seconds", Allowed, Denied),
|
|
// Exponential buckets ranging from 0.0005s to 3s.
|
|
Buckets: prometheus.ExponentialBuckets(0.0005, 3, 8),
|
|
}, []string{"limit", "decision"})
|
|
stats.MustRegister(limiter.spendLatency)
|
|
|
|
limiter.overrideUsageGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "ratelimits_override_usage",
|
|
Help: "Proportion of override limit used, by limit name and bucket key.",
|
|
}, []string{"limit", "bucket_key"})
|
|
stats.MustRegister(limiter.overrideUsageGauge)
|
|
|
|
return limiter, nil
|
|
}
|
|
|
|
type Decision struct {
|
|
// Allowed is true if the bucket possessed enough capacity to allow the
|
|
// request given the cost.
|
|
Allowed bool
|
|
|
|
// Remaining is the number of requests the client is allowed to make before
|
|
// they're rate limited.
|
|
Remaining int64
|
|
|
|
// RetryIn is the duration the client MUST wait before they're allowed to
|
|
// make a request.
|
|
RetryIn time.Duration
|
|
|
|
// ResetIn is the duration the bucket will take to refill to its maximum
|
|
// capacity, assuming no further requests are made.
|
|
ResetIn time.Duration
|
|
|
|
// newTAT indicates the time at which the bucket will be full. It is the
|
|
// theoretical arrival time (TAT) of next request. It must be no more than
|
|
// (burst * (period / count)) in the future at any single point in time.
|
|
newTAT time.Time
|
|
}
|
|
|
|
// Check DOES NOT deduct the cost of the request from the provided bucket's
|
|
// capacity. The returned *Decision indicates whether the capacity exists to
|
|
// satisfy the cost and represents the hypothetical state of the bucket IF the
|
|
// cost WERE to be deducted. If no bucket exists it will NOT be created. No
|
|
// state is persisted to the underlying datastore.
|
|
func (l *Limiter) Check(ctx context.Context, txn Transaction) (*Decision, error) {
|
|
if txn.allowOnly() {
|
|
return allowedDecision, nil
|
|
}
|
|
// Remove cancellation from the request context so that transactions are not
|
|
// interrupted by a client disconnect.
|
|
ctx = context.WithoutCancel(ctx)
|
|
tat, err := l.source.Get(ctx, txn.bucketKey)
|
|
if err != nil {
|
|
if !errors.Is(err, ErrBucketNotFound) {
|
|
return nil, err
|
|
}
|
|
// First request from this client. No need to initialize the bucket
|
|
// because this is a check, not a spend. A TAT of "now" is equivalent to
|
|
// a full bucket.
|
|
return maybeSpend(l.clk, txn.limit, l.clk.Now(), txn.cost), nil
|
|
}
|
|
return maybeSpend(l.clk, txn.limit, tat, txn.cost), nil
|
|
}
|
|
|
|
// Spend attempts to deduct the cost from the provided bucket's capacity. The
|
|
// returned *Decision indicates whether the capacity existed to satisfy the cost
|
|
// and represents the current state of the bucket. If no bucket exists it WILL
|
|
// be created WITH the cost factored into its initial state. The new bucket
|
|
// state is persisted to the underlying datastore, if applicable, before
|
|
// returning.
|
|
func (l *Limiter) Spend(ctx context.Context, txn Transaction) (*Decision, error) {
|
|
return l.BatchSpend(ctx, []Transaction{txn})
|
|
}
|
|
|
|
func prepareBatch(txns []Transaction) ([]Transaction, []string, error) {
|
|
var bucketKeys []string
|
|
var transactions []Transaction
|
|
for _, txn := range txns {
|
|
if txn.allowOnly() {
|
|
// Ignore allow-only transactions.
|
|
continue
|
|
}
|
|
if slices.Contains(bucketKeys, txn.bucketKey) {
|
|
return nil, nil, fmt.Errorf("found duplicate bucket %q in batch", txn.bucketKey)
|
|
}
|
|
bucketKeys = append(bucketKeys, txn.bucketKey)
|
|
transactions = append(transactions, txn)
|
|
}
|
|
return transactions, bucketKeys, nil
|
|
}
|
|
|
|
type batchDecision struct {
|
|
*Decision
|
|
}
|
|
|
|
func newBatchDecision() *batchDecision {
|
|
return &batchDecision{
|
|
Decision: &Decision{
|
|
Allowed: true,
|
|
Remaining: math.MaxInt64,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (d *batchDecision) merge(in *Decision) {
|
|
d.Allowed = d.Allowed && in.Allowed
|
|
d.Remaining = min(d.Remaining, in.Remaining)
|
|
d.RetryIn = max(d.RetryIn, in.RetryIn)
|
|
d.ResetIn = max(d.ResetIn, in.ResetIn)
|
|
if in.newTAT.After(d.newTAT) {
|
|
d.newTAT = in.newTAT
|
|
}
|
|
}
|
|
|
|
// BatchSpend attempts to deduct the costs from the provided buckets'
|
|
// capacities. If applicable, new bucket states are persisted to the underlying
|
|
// datastore before returning. Non-existent buckets will be initialized WITH the
|
|
// cost factored into the initial state. The following rules are applied to
|
|
// merge the Decisions for each Transaction into a single batch Decision:
|
|
// - Allowed is true if all Transactions where check is true were allowed,
|
|
// - RetryIn and ResetIn are the largest values of each across all Decisions,
|
|
// - Remaining is the smallest value of each across all Decisions, and
|
|
// - Decisions resulting from spend-only Transactions are never merged.
|
|
func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision, error) {
|
|
batch, bucketKeys, err := prepareBatch(txns)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(batch) == 0 {
|
|
// All Transactions were allow-only.
|
|
return allowedDecision, nil
|
|
}
|
|
|
|
// Remove cancellation from the request context so that transactions are not
|
|
// interrupted by a client disconnect.
|
|
ctx = context.WithoutCancel(ctx)
|
|
tats, err := l.source.BatchGet(ctx, bucketKeys)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
start := l.clk.Now()
|
|
batchDecision := newBatchDecision()
|
|
newTATs := make(map[string]time.Time)
|
|
|
|
for _, txn := range batch {
|
|
tat, exists := tats[txn.bucketKey]
|
|
if !exists {
|
|
// First request from this client.
|
|
tat = l.clk.Now()
|
|
}
|
|
|
|
d := maybeSpend(l.clk, txn.limit, tat, txn.cost)
|
|
|
|
if txn.limit.isOverride {
|
|
utilization := float64(txn.limit.Burst-d.Remaining) / float64(txn.limit.Burst)
|
|
l.overrideUsageGauge.WithLabelValues(txn.limit.name.String(), txn.bucketKey).Set(utilization)
|
|
}
|
|
|
|
if d.Allowed && (tat != d.newTAT) && txn.spend {
|
|
// New bucket state should be persisted.
|
|
newTATs[txn.bucketKey] = d.newTAT
|
|
}
|
|
|
|
if !txn.spendOnly() {
|
|
batchDecision.merge(d)
|
|
}
|
|
}
|
|
|
|
if batchDecision.Allowed {
|
|
err = l.source.BatchSet(ctx, newTATs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
l.spendLatency.WithLabelValues("batch", Allowed).Observe(l.clk.Since(start).Seconds())
|
|
} else {
|
|
l.spendLatency.WithLabelValues("batch", Denied).Observe(l.clk.Since(start).Seconds())
|
|
}
|
|
return batchDecision.Decision, nil
|
|
}
|
|
|
|
// Refund attempts to refund all of the cost to the capacity of the specified
|
|
// bucket. The returned *Decision indicates whether the refund was successful
|
|
// and represents the current state of the bucket. The new bucket state is
|
|
// persisted to the underlying datastore, if applicable, before returning. If no
|
|
// bucket exists it will NOT be created. Spend-only Transactions are assumed to
|
|
// be refundable. Check-only Transactions are never refunded.
|
|
//
|
|
// Note: The amount refunded cannot cause the bucket to exceed its maximum
|
|
// capacity. Partial refunds are allowed and are considered successful. For
|
|
// instance, if a bucket has a maximum capacity of 10 and currently has 5
|
|
// requests remaining, a refund request of 7 will result in the bucket reaching
|
|
// its maximum capacity of 10, not 12.
|
|
func (l *Limiter) Refund(ctx context.Context, txn Transaction) (*Decision, error) {
|
|
return l.BatchRefund(ctx, []Transaction{txn})
|
|
}
|
|
|
|
// BatchRefund attempts to refund all or some of the costs to the provided
|
|
// buckets' capacities. Non-existent buckets will NOT be initialized. The new
|
|
// bucket state is persisted to the underlying datastore, if applicable, before
|
|
// returning. Spend-only Transactions are assumed to be refundable. Check-only
|
|
// Transactions are never refunded. The following rules are applied to merge the
|
|
// Decisions for each Transaction into a single batch Decision:
|
|
// - Allowed is true if all Transactions where check is true were allowed,
|
|
// - RetryIn and ResetIn are the largest values of each across all Decisions,
|
|
// - Remaining is the smallest value of each across all Decisions, and
|
|
// - Decisions resulting from spend-only Transactions are never merged.
|
|
func (l *Limiter) BatchRefund(ctx context.Context, txns []Transaction) (*Decision, error) {
|
|
batch, bucketKeys, err := prepareBatch(txns)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(batch) == 0 {
|
|
// All Transactions were allow-only.
|
|
return allowedDecision, nil
|
|
}
|
|
|
|
// Remove cancellation from the request context so that transactions are not
|
|
// interrupted by a client disconnect.
|
|
ctx = context.WithoutCancel(ctx)
|
|
tats, err := l.source.BatchGet(ctx, bucketKeys)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
batchDecision := newBatchDecision()
|
|
newTATs := make(map[string]time.Time)
|
|
|
|
for _, txn := range batch {
|
|
tat, exists := tats[txn.bucketKey]
|
|
if !exists {
|
|
// Ignore non-existent bucket.
|
|
continue
|
|
}
|
|
|
|
var cost int64
|
|
if !txn.checkOnly() {
|
|
cost = txn.cost
|
|
}
|
|
d := maybeRefund(l.clk, txn.limit, tat, cost)
|
|
batchDecision.merge(d)
|
|
if d.Allowed && tat != d.newTAT {
|
|
// New bucket state should be persisted.
|
|
newTATs[txn.bucketKey] = d.newTAT
|
|
}
|
|
}
|
|
|
|
if len(newTATs) > 0 {
|
|
err = l.source.BatchSet(ctx, newTATs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return batchDecision.Decision, nil
|
|
}
|
|
|
|
// Reset resets the specified bucket to its maximum capacity. The new bucket
|
|
// state is persisted to the underlying datastore before returning.
|
|
func (l *Limiter) Reset(ctx context.Context, bucketKey string) error {
|
|
// Remove cancellation from the request context so that transactions are not
|
|
// interrupted by a client disconnect.
|
|
ctx = context.WithoutCancel(ctx)
|
|
return l.source.Delete(ctx, bucketKey)
|
|
}
|