WFE: Add new key-value ratelimits implementation (#7089)

Integrate the key-value rate limits from #6947 into the WFE. Rate limits
are backed by the Redis source added in #7016, and use the SRV record
shard discovery added in #7042.

Part of #5545
This commit is contained in:
Samantha 2023-10-04 14:12:38 -04:00 committed by GitHub
parent b68b21c7a8
commit 9aef5839b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 680 additions and 163 deletions

View File

@ -25,6 +25,8 @@ import (
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/nonce"
rapb "github.com/letsencrypt/boulder/ra/proto"
"github.com/letsencrypt/boulder/ratelimits"
bredis "github.com/letsencrypt/boulder/redis"
sapb "github.com/letsencrypt/boulder/sa/proto"
"github.com/letsencrypt/boulder/wfe2"
)
@ -137,6 +139,25 @@ type Config struct {
PendingAuthorizationLifetimeDays int `validate:"required,min=1,max=29"`
AccountCache *CacheConfig
Limiter struct {
// Redis contains the configuration necessary to connect to Redis
// for rate limiting. This field is required to enable rate
// limiting.
Redis *bredis.Config `validate:"required_with=Defaults"`
// Defaults is a path to a YAML file containing default rate limits.
// See: ratelimits/README.md for details. This field is required to
// enable rate limiting. If any individual rate limit is not set,
// that limit will be disabled.
Defaults string `validate:"required_with=Redis"`
// Overrides is a path to a YAML file containing overrides for the
// default rate limits. See: ratelimits/README.md for details. If
// this field is not set, all requesters will be subject to the
// default rate limits.
Overrides string
}
}
Syslog cmd.SyslogConfig
@ -318,6 +339,18 @@ func main() {
}
pendingAuthorizationLifetime := time.Duration(c.WFE.PendingAuthorizationLifetimeDays) * 24 * time.Hour
var limiter *ratelimits.Limiter
var limiterRedis *bredis.Ring
if c.WFE.Limiter.Defaults != "" {
// Setup rate limiting.
limiterRedis, err = bredis.NewRingFromConfig(*c.WFE.Limiter.Redis, stats, logger)
cmd.FailOnError(err, "Failed to create Redis ring")
source := ratelimits.NewRedisSource(limiterRedis.Ring, clk, stats)
limiter, err = ratelimits.NewLimiter(clk, source, c.WFE.Limiter.Defaults, c.WFE.Limiter.Overrides, stats)
cmd.FailOnError(err, "Failed to create rate limiter")
}
var accountGetter wfe2.AccountGetter
if c.WFE.AccountCache != nil {
accountGetter = wfe2.NewAccountCache(sac,
@ -346,6 +379,7 @@ func main() {
rnc,
npKey,
accountGetter,
limiter,
)
cmd.FailOnError(err, "Unable to create WFE")
@ -402,6 +436,7 @@ func main() {
defer cancel()
_ = srv.Shutdown(ctx)
_ = tlsSrv.Shutdown(ctx)
limiterRedis.StopLookups()
oTelShutdown(ctx)
}()

View File

@ -119,7 +119,7 @@ func loadAndParseOverrideLimits(path string) (limits, error) {
err = validateIdForName(name, id)
if err != nil {
return nil, fmt.Errorf(
"validating name %s and id %q for override limit %q: %w", nameToString[name], id, k, err)
"validating name %s and id %q for override limit %q: %w", name, id, k, err)
}
if name == CertificatesPerFQDNSetPerAccount {
// FQDNSet hashes are not a nice thing to ask for in a config file,

View File

@ -11,25 +11,22 @@ import (
)
func Test_parseOverrideNameId(t *testing.T) {
newRegistrationsPerIPAddressStr := nameToString[NewRegistrationsPerIPAddress]
newRegistrationsPerIPv6RangeStr := nameToString[NewRegistrationsPerIPv6Range]
// 'enum:ipv4'
// Valid IPv4 address.
name, id, err := parseOverrideNameId(newRegistrationsPerIPAddressStr + ":10.0.0.1")
name, id, err := parseOverrideNameId(NewRegistrationsPerIPAddress.String() + ":10.0.0.1")
test.AssertNotError(t, err, "should not error")
test.AssertEquals(t, name, NewRegistrationsPerIPAddress)
test.AssertEquals(t, id, "10.0.0.1")
// 'enum:ipv6range'
// Valid IPv6 address range.
name, id, err = parseOverrideNameId(newRegistrationsPerIPv6RangeStr + ":2001:0db8:0000::/48")
name, id, err = parseOverrideNameId(NewRegistrationsPerIPv6Range.String() + ":2001:0db8:0000::/48")
test.AssertNotError(t, err, "should not error")
test.AssertEquals(t, name, NewRegistrationsPerIPv6Range)
test.AssertEquals(t, id, "2001:0db8:0000::/48")
// Missing colon (this should never happen but we should avoid panicking).
_, _, err = parseOverrideNameId(newRegistrationsPerIPAddressStr + "10.0.0.1")
_, _, err = parseOverrideNameId(NewRegistrationsPerIPAddress.String() + "10.0.0.1")
test.AssertError(t, err, "missing colon")
// Empty string.
@ -37,7 +34,7 @@ func Test_parseOverrideNameId(t *testing.T) {
test.AssertError(t, err, "empty string")
// Only a colon.
_, _, err = parseOverrideNameId(newRegistrationsPerIPAddressStr + ":")
_, _, err = parseOverrideNameId(NewRegistrationsPerIPAddress.String() + ":")
test.AssertError(t, err, "only a colon")
// Invalid enum.

View File

@ -29,9 +29,13 @@ var ErrInvalidCostForCheck = fmt.Errorf("invalid check cost, must be >= 0")
// ErrInvalidCostOverLimit indicates that the cost specified was > limit.Burst.
var ErrInvalidCostOverLimit = fmt.Errorf("invalid cost, must be <= limit.Burst")
// ErrBucketAlreadyFull indicates that the bucket already has reached its
// maximum capacity.
var ErrBucketAlreadyFull = fmt.Errorf("bucket already full")
// errLimitDisabled indicates that the limit name specified is valid but is not
// currently configured.
var errLimitDisabled = errors.New("limit disabled")
// disabledLimitDecision is an "allowed" *Decision that should be returned when
// a checked limit is found to be disabled.
var disabledLimitDecision = &Decision{true, 0, 0, 0, time.Time{}}
// Limiter provides a high-level interface for rate limiting requests by
// utilizing a leaky bucket-style approach.
@ -46,6 +50,7 @@ type Limiter struct {
source source
clk clock.Clock
spendLatency *prometheus.HistogramVec
overrideUsageGauge *prometheus.GaugeVec
}
@ -62,6 +67,14 @@ func NewLimiter(clk clock.Clock, source source, defaults, overrides string, stat
return nil, err
}
limiter.spendLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "ratelimits_spend_latency",
Help: fmt.Sprintf("Latency of ratelimit checks labeled by limit=[name] and decision=[%s|%s], in seconds", Allowed, Denied),
// Exponential buckets ranging from 0.0005s to 3s.
Buckets: prometheus.ExponentialBuckets(0.0005, 3, 8),
}, []string{"limit", "decision"})
stats.MustRegister(limiter.spendLatency)
if overrides == "" {
// No overrides specified, initialize an empty map.
limiter.overrides = make(limits)
@ -114,7 +127,8 @@ type Decision struct {
// wait time before the client can make another request, and the time until the
// bucket refills to its maximum capacity (resets). If no bucket exists for the
// given limit Name and client id, a new one will be created WITHOUT the
// request's cost deducted from its initial capacity.
// request's cost deducted from its initial capacity. If the specified limit is
// disabled, ErrLimitDisabled is returned.
func (l *Limiter) Check(ctx context.Context, name Name, id string, cost int64) (*Decision, error) {
if cost < 0 {
return nil, ErrInvalidCostForCheck
@ -122,6 +136,9 @@ func (l *Limiter) Check(ctx context.Context, name Name, id string, cost int64) (
limit, err := l.getLimit(name, id)
if err != nil {
if errors.Is(err, errLimitDisabled) {
return disabledLimitDecision, nil
}
return nil, err
}
@ -129,6 +146,9 @@ func (l *Limiter) Check(ctx context.Context, name Name, id string, cost int64) (
return nil, ErrInvalidCostOverLimit
}
// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
tat, err := l.source.Get(ctx, bucketKey(name, id))
if err != nil {
if !errors.Is(err, ErrBucketNotFound) {
@ -153,7 +173,8 @@ func (l *Limiter) Check(ctx context.Context, name Name, id string, cost int64) (
// required wait time before the client can make another request, and the time
// until the bucket refills to its maximum capacity (resets). If no bucket
// exists for the given limit Name and client id, a new one will be created WITH
// the request's cost deducted from its initial capacity.
// the request's cost deducted from its initial capacity. If the specified limit
// is disabled, ErrLimitDisabled is returned.
func (l *Limiter) Spend(ctx context.Context, name Name, id string, cost int64) (*Decision, error) {
if cost <= 0 {
return nil, ErrInvalidCost
@ -161,6 +182,9 @@ func (l *Limiter) Spend(ctx context.Context, name Name, id string, cost int64) (
limit, err := l.getLimit(name, id)
if err != nil {
if errors.Is(err, errLimitDisabled) {
return disabledLimitDecision, nil
}
return nil, err
}
@ -168,11 +192,27 @@ func (l *Limiter) Spend(ctx context.Context, name Name, id string, cost int64) (
return nil, ErrInvalidCostOverLimit
}
start := l.clk.Now()
status := Denied
defer func() {
l.spendLatency.WithLabelValues(name.String(), status).Observe(l.clk.Since(start).Seconds())
}()
// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
tat, err := l.source.Get(ctx, bucketKey(name, id))
if err != nil {
if errors.Is(err, ErrBucketNotFound) {
// First request from this client.
return l.initialize(ctx, limit, name, id, cost)
d, err := l.initialize(ctx, limit, name, id, cost)
if err != nil {
return nil, err
}
if d.Allowed {
status = Allowed
}
return d, nil
}
return nil, err
}
@ -183,13 +223,19 @@ func (l *Limiter) Spend(ctx context.Context, name Name, id string, cost int64) (
// Calculate the current utilization of the override limit for the
// specified client id.
utilization := float64(limit.Burst-d.Remaining) / float64(limit.Burst)
l.overrideUsageGauge.WithLabelValues(nameToString[name], id).Set(utilization)
l.overrideUsageGauge.WithLabelValues(name.String(), id).Set(utilization)
}
if !d.Allowed {
return d, nil
}
return d, l.source.Set(ctx, bucketKey(name, id), d.newTAT)
err = l.source.Set(ctx, bucketKey(name, id), d.newTAT)
if err != nil {
return nil, err
}
status = Allowed
return d, nil
}
// Refund attempts to refund the cost to the bucket identified by limit name and
@ -210,16 +256,23 @@ func (l *Limiter) Refund(ctx context.Context, name Name, id string, cost int64)
limit, err := l.getLimit(name, id)
if err != nil {
if errors.Is(err, errLimitDisabled) {
return disabledLimitDecision, nil
}
return nil, err
}
// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
tat, err := l.source.Get(ctx, bucketKey(name, id))
if err != nil {
return nil, err
}
d := maybeRefund(l.clk, limit, tat, cost)
if !d.Allowed {
return d, ErrBucketAlreadyFull
// The bucket is already at maximum capacity.
return d, nil
}
return d, l.source.Set(ctx, bucketKey(name, id), d.newTAT)
@ -227,6 +280,9 @@ func (l *Limiter) Refund(ctx context.Context, name Name, id string, cost int64)
// Reset resets the specified bucket.
func (l *Limiter) Reset(ctx context.Context, name Name, id string) error {
// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
return l.source.Delete(ctx, bucketKey(name, id))
}
@ -234,6 +290,10 @@ func (l *Limiter) Reset(ctx context.Context, name Name, id string) error {
// cost of the request factored into the initial state.
func (l *Limiter) initialize(ctx context.Context, rl limit, name Name, id string, cost int64) (*Decision, error) {
d := maybeSpend(l.clk, rl, l.clk.Now(), cost)
// Remove cancellation from the request context so that transactions are not
// interrupted by a client disconnect.
ctx = context.WithoutCancel(ctx)
err := l.source.Set(ctx, bucketKey(name, id), d.newTAT)
if err != nil {
return nil, err
@ -244,8 +304,14 @@ func (l *Limiter) initialize(ctx context.Context, rl limit, name Name, id string
// GetLimit returns the limit for the specified by name and id, name is
// required, id is optional. If id is left unspecified, the default limit for
// the limit specified by name is returned.
// the limit specified by name is returned. If no default limit exists for the
// specified name, ErrLimitDisabled is returned.
func (l *Limiter) getLimit(name Name, id string) (limit, error) {
if !name.isValid() {
// This should never happen. Callers should only be specifying the limit
// Name enums defined in this package.
return limit{}, fmt.Errorf("specified name enum %q, is invalid", name)
}
if id != "" {
// Check for override.
ol, ok := l.overrides[bucketKey(name, id)]
@ -257,5 +323,5 @@ func (l *Limiter) getLimit(name Name, id string) (limit, error) {
if ok {
return dl, nil
}
return limit{}, fmt.Errorf("limit %q does not exist", name)
return limit{}, errLimitDisabled
}

View File

@ -81,7 +81,7 @@ func Test_Limiter_CheckWithLimitOverrides(t *testing.T) {
// Verify our overrideUsageGauge is being set correctly. 0.0 == 0% of
// the bucket has been consumed.
test.AssertMetricWithLabelsEquals(t, l.overrideUsageGauge, prometheus.Labels{
"limit": nameToString[NewRegistrationsPerIPAddress], "client_id": tenZeroZeroTwo}, 0)
"limit": NewRegistrationsPerIPAddress.String(), "client_id": tenZeroZeroTwo}, 0)
// Attempt to check a spend of 41 requests (a cost > the limit burst
// capacity), this should fail with a specific error.
@ -108,7 +108,7 @@ func Test_Limiter_CheckWithLimitOverrides(t *testing.T) {
// Verify our overrideUsageGauge is being set correctly. 1.0 == 100% of
// the bucket has been consumed.
test.AssertMetricWithLabelsEquals(t, l.overrideUsageGauge, prometheus.Labels{
"limit_name": nameToString[NewRegistrationsPerIPAddress], "client_id": tenZeroZeroTwo}, 1.0)
"limit_name": NewRegistrationsPerIPAddress.String(), "client_id": tenZeroZeroTwo}, 1.0)
// Verify our RetryIn is correct. 1 second == 1000 milliseconds and
// 1000/40 = 25 milliseconds per request.
@ -337,7 +337,8 @@ func Test_Limiter_RefundAndReset(t *testing.T) {
// Refund 1 requests above our limit, this should fail.
d, err = l.Refund(testCtx, NewRegistrationsPerIPAddress, testIP, 1)
test.AssertErrorIs(t, err, ErrBucketAlreadyFull)
test.AssertNotError(t, err, "should not error")
test.Assert(t, !d.Allowed, "should not be allowed")
test.AssertEquals(t, d.Remaining, int64(20))
})
}

View File

@ -26,7 +26,13 @@ const (
NewRegistrationsPerIPAddress
// NewRegistrationsPerIPv6Range uses bucket key 'enum:ipv6rangeCIDR'. The
// address range must be a /48.
// address range must be a /48. RFC 3177, which was published in 2001,
// advised operators to allocate a /48 block of IPv6 addresses for most end
// sites. RFC 6177, which was published in 2011 and obsoletes RFC 3177,
// advises allocating a smaller /56 block. We've chosen to use the larger
// /48 block for our IPv6 rate limiting. See:
// 1. https://tools.ietf.org/html/rfc3177#section-3
// 2. https://datatracker.ietf.org/doc/html/rfc6177#section-2
NewRegistrationsPerIPv6Range
// NewOrdersPerAccount uses bucket key 'enum:regId'.
@ -47,6 +53,20 @@ const (
CertificatesPerFQDNSetPerAccount
)
// isValid returns true if the Name is a valid rate limit name.
func (n Name) isValid() bool {
return n > Unknown && n < Name(len(nameToString))
}
// String returns the string representation of the Name. It allows Name to
// satisfy the fmt.Stringer interface.
func (n Name) String() string {
if !n.isValid() {
return nameToString[Unknown]
}
return nameToString[n]
}
// nameToString is a map of Name values to string names.
var nameToString = map[Name]string{
Unknown: "Unknown",

29
ratelimits/names_test.go Normal file
View File

@ -0,0 +1,29 @@
package ratelimits
import (
"testing"
"github.com/letsencrypt/boulder/test"
)
func TestNameIsValid(t *testing.T) {
t.Parallel()
type args struct {
name Name
}
tests := []struct {
name string
args args
want bool
}{
{name: "Unknown", args: args{name: Unknown}, want: false},
{name: "9001", args: args{name: 9001}, want: false},
{name: "NewRegistrationsPerIPAddress", args: args{name: NewRegistrationsPerIPAddress}, want: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.args.name.isValid()
test.AssertEquals(t, tt.want, got)
})
}
}

View File

@ -12,13 +12,27 @@ var ErrBucketNotFound = fmt.Errorf("bucket not found")
// source is an interface for creating and modifying TATs.
type source interface {
// Set stores the TAT at the specified bucketKey ('name:id').
// Set stores the TAT at the specified bucketKey (formatted as 'name:id').
// Implementations MUST ensure non-blocking operations by either:
// a) applying a deadline or timeout to the context WITHIN the method, or
// b) guaranteeing the operation will not block indefinitely (e.g. via
// the underlying storage client implementation).
Set(ctx context.Context, bucketKey string, tat time.Time) error
// Get retrieves the TAT at the specified bucketKey ('name:id').
// Get retrieves the TAT associated with the specified bucketKey (formatted
// as 'name:id'). Implementations MUST ensure non-blocking operations by
// either:
// a) applying a deadline or timeout to the context WITHIN the method, or
// b) guaranteeing the operation will not block indefinitely (e.g. via
// the underlying storage client implementation).
Get(ctx context.Context, bucketKey string) (time.Time, error)
// Delete deletes the TAT at the specified bucketKey ('name:id').
// Delete removes the TAT associated with the specified bucketKey (formatted
// as 'name:id'). Implementations MUST ensure non-blocking operations by
// either:
// a) applying a deadline or timeout to the context WITHIN the method, or
// b) guaranteeing the operation will not block indefinitely (e.g. via
// the underlying storage client implementation).
Delete(ctx context.Context, bucketKey string) error
}

View File

@ -3,11 +3,9 @@ package ratelimits
import (
"context"
"errors"
"strings"
"net"
"time"
bredis "github.com/letsencrypt/boulder/redis"
"github.com/jmhodges/clock"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
@ -18,93 +16,71 @@ var _ source = (*RedisSource)(nil)
// RedisSource is a ratelimits source backed by sharded Redis.
type RedisSource struct {
client *redis.Ring
timeout time.Duration
clk clock.Clock
setLatency *prometheus.HistogramVec
getLatency *prometheus.HistogramVec
deleteLatency *prometheus.HistogramVec
client *redis.Ring
clk clock.Clock
latency *prometheus.HistogramVec
}
// NewRedisSource returns a new Redis backed source using the provided
// *redis.Ring client.
func NewRedisSource(client *redis.Ring, timeout time.Duration, clk clock.Clock, stats prometheus.Registerer) *RedisSource {
if len(client.Options().Addrs) == 0 {
return nil
}
var addrs []string
for addr := range client.Options().Addrs {
addrs = append(addrs, addr)
}
labels := prometheus.Labels{
"addresses": strings.Join(addrs, ", "),
"user": client.Options().Username,
}
stats.MustRegister(bredis.NewMetricsCollector(client, labels))
setLatency := prometheus.NewHistogramVec(
func NewRedisSource(client *redis.Ring, clk clock.Clock, stats prometheus.Registerer) *RedisSource {
latency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "rl_set_latency",
Help: "Histogram of latencies of redisSource.Set calls",
// Exponential buckets ranging from 0.0005s to 2s
Buckets: prometheus.ExponentialBucketsRange(0.0005, 2, 8),
Name: "ratelimits_latency",
Help: "Histogram of Redis call latencies labeled by call=[set|get|delete|ping] and result=[success|error]",
// Exponential buckets ranging from 0.0005s to 3s.
Buckets: prometheus.ExponentialBucketsRange(0.0005, 3, 8),
},
[]string{"result"},
[]string{"call", "result"},
)
stats.MustRegister(setLatency)
getLatency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "rl_get_latency",
Help: "Histogram of redisSource.Get call latencies",
// Exponential buckets ranging from 0.0005s to 2s
Buckets: prometheus.ExponentialBucketsRange(0.0005, 2, 8),
},
[]string{"result"},
)
stats.MustRegister(getLatency)
deleteLatency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "rl_delete_latency",
Help: "Histogram of latencies of redisSource.Delete calls",
// Exponential buckets ranging from 0.0005s to 2s
Buckets: prometheus.ExponentialBucketsRange(0.0005, 2, 8),
},
[]string{"result"},
)
stats.MustRegister(deleteLatency)
stats.MustRegister(latency)
return &RedisSource{
client: client,
timeout: timeout,
clk: clk,
setLatency: setLatency,
getLatency: getLatency,
deleteLatency: deleteLatency,
client: client,
clk: clk,
latency: latency,
}
}
// resultForError returns a string representing the result of the operation
// based on the provided error.
func resultForError(err error) string {
if errors.Is(redis.Nil, err) {
// Bucket key does not exist.
return "notFound"
} else if errors.Is(err, context.DeadlineExceeded) {
// Client read or write deadline exceeded.
return "deadlineExceeded"
} else if errors.Is(err, context.Canceled) {
// Caller canceled the operation.
return "canceled"
}
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
// Dialer timed out connecting to Redis.
return "timeout"
}
var redisErr redis.Error
if errors.Is(err, redisErr) {
// An internal error was returned by the Redis server.
return "redisError"
}
return "failed"
}
// Set stores the TAT at the specified bucketKey ('name:id'). It returns an
// error if the operation failed and nil otherwise. If the bucketKey does not
// exist, it will be created.
func (r *RedisSource) Set(ctx context.Context, bucketKey string, tat time.Time) error {
start := r.clk.Now()
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
err := r.client.Set(ctx, bucketKey, tat.UnixNano(), 0).Err()
if err != nil {
state := "failed"
if errors.Is(err, context.DeadlineExceeded) {
state = "deadlineExceeded"
} else if errors.Is(err, context.Canceled) {
state = "canceled"
}
r.setLatency.With(prometheus.Labels{"result": state}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "set", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
return err
}
r.setLatency.With(prometheus.Labels{"result": "success"}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "set", "result": "success"}).Observe(time.Since(start).Seconds())
return nil
}
@ -113,28 +89,19 @@ func (r *RedisSource) Set(ctx context.Context, bucketKey string, tat time.Time)
// If the bucketKey does not exist, it returns ErrBucketNotFound.
func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, error) {
start := r.clk.Now()
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
tatNano, err := r.client.Get(ctx, bucketKey).Int64()
if err != nil {
if errors.Is(err, redis.Nil) {
// Bucket key does not exist.
r.getLatency.With(prometheus.Labels{"result": "notFound"}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "get", "result": "notFound"}).Observe(time.Since(start).Seconds())
return time.Time{}, ErrBucketNotFound
}
state := "failed"
if errors.Is(err, context.DeadlineExceeded) {
state = "deadlineExceeded"
} else if errors.Is(err, context.Canceled) {
state = "canceled"
}
r.getLatency.With(prometheus.Labels{"result": state}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "get", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
return time.Time{}, err
}
r.getLatency.With(prometheus.Labels{"result": "success"}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "get", "result": "success"}).Observe(time.Since(start).Seconds())
return time.Unix(0, tatNano).UTC(), nil
}
@ -143,30 +110,29 @@ func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, err
// indicate that the bucketKey existed.
func (r *RedisSource) Delete(ctx context.Context, bucketKey string) error {
start := r.clk.Now()
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
err := r.client.Del(ctx, bucketKey).Err()
if err != nil {
r.deleteLatency.With(prometheus.Labels{"result": "failed"}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "delete", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
return err
}
r.deleteLatency.With(prometheus.Labels{"result": "success"}).Observe(time.Since(start).Seconds())
r.latency.With(prometheus.Labels{"call": "delete", "result": "success"}).Observe(time.Since(start).Seconds())
return nil
}
// Ping checks that each shard of the *redis.Ring is reachable using the PING
// command. It returns an error if any shard is unreachable and nil otherwise.
func (r *RedisSource) Ping(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
start := r.clk.Now()
err := r.client.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
return shard.Ping(ctx).Err()
})
if err != nil {
r.latency.With(prometheus.Labels{"call": "ping", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
return err
}
r.latency.With(prometheus.Labels{"call": "ping", "result": "success"}).Observe(time.Since(start).Seconds())
return nil
}

View File

@ -2,7 +2,6 @@ package ratelimits
import (
"testing"
"time"
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/metrics"
@ -33,7 +32,7 @@ func newTestRedisSource(clk clock.FakeClock, addrs map[string]string) *RedisSour
Password: "824968fa490f4ecec1e52d5e34916bdb60d45f8d",
TLSConfig: tlsConfig2,
})
return NewRedisSource(client, 5*time.Second, clk, metrics.NoopRegisterer)
return NewRedisSource(client, clk, metrics.NoopRegisterer)
}
func newRedisTestLimiter(t *testing.T, clk clock.FakeClock) *Limiter {

181
redis/config.go Normal file
View File

@ -0,0 +1,181 @@
package redis
import (
"fmt"
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/config"
blog "github.com/letsencrypt/boulder/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
)
// Config contains the configuration needed to act as a Redis client.
type Config struct {
// TLS contains the configuration to speak TLS with Redis.
TLS cmd.TLSConfig
// Username used to authenticate to each Redis instance.
Username string `validate:"required"`
// PasswordFile is the path to a file holding the password used to
// authenticate to each Redis instance.
cmd.PasswordConfig
// ShardAddrs is a map of shard names to IP address:port pairs. The go-redis
// `Ring` client will shard reads and writes across the provided Redis
// Servers based on a consistent hashing algorithm.
ShardAddrs map[string]string `validate:"omitempty,required_without=Lookups,min=1,dive,hostname_port"`
// Lookups each entry contains a service and domain name that will be used
// to construct a SRV DNS query to lookup Redis backends. For example: if
// the resource record is 'foo.service.consul', then the 'Service' is 'foo'
// and the 'Domain' is 'service.consul'. The expected dNSName to be
// authenticated in the server certificate would be 'foo.service.consul'.
Lookups []cmd.ServiceDomain `validate:"omitempty,required_without=ShardAddrs,min=1,dive"`
// LookupFrequency is the frequency of periodic SRV lookups. Defaults to 30
// seconds.
LookupFrequency config.Duration `validate:"-"`
// LookupDNSAuthority can only be specified with Lookups. It's a single
// <hostname|IPv4|[IPv6]>:<port> of the DNS server to be used for resolution
// of Redis backends. If the address contains a hostname it will be resolved
// using system DNS. If the address contains a port, the client will use it
// directly, otherwise port 53 is used. If this field is left unspecified
// the system DNS will be used for resolution.
LookupDNSAuthority string `validate:"excluded_without=Lookups,omitempty,ip|hostname|hostname_port"`
// Enables read-only commands on replicas.
ReadOnly bool
// Allows routing read-only commands to the closest primary or replica.
// It automatically enables ReadOnly.
RouteByLatency bool
// Allows routing read-only commands to a random primary or replica.
// It automatically enables ReadOnly.
RouteRandomly bool
// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
PoolFIFO bool
// Maximum number of retries before giving up.
// Default is to not retry failed commands.
MaxRetries int `validate:"min=0"`
// Minimum backoff between each retry.
// Default is 8 milliseconds; -1 disables backoff.
MinRetryBackoff config.Duration `validate:"-"`
// Maximum backoff between each retry.
// Default is 512 milliseconds; -1 disables backoff.
MaxRetryBackoff config.Duration `validate:"-"`
// Dial timeout for establishing new connections.
// Default is 5 seconds.
DialTimeout config.Duration `validate:"-"`
// Timeout for socket reads. If reached, commands will fail
// with a timeout instead of blocking. Use value -1 for no timeout and 0 for default.
// Default is 3 seconds.
ReadTimeout config.Duration `validate:"-"`
// Timeout for socket writes. If reached, commands will fail
// with a timeout instead of blocking.
// Default is ReadTimeout.
WriteTimeout config.Duration `validate:"-"`
// Maximum number of socket connections.
// Default is 5 connections per every CPU as reported by runtime.NumCPU.
// If this is set to an explicit value, that's not multiplied by NumCPU.
// PoolSize applies per cluster node and not for the whole cluster.
// https://pkg.go.dev/github.com/go-redis/redis#ClusterOptions
PoolSize int `validate:"min=0"`
// Minimum number of idle connections which is useful when establishing
// new connection is slow.
MinIdleConns int `validate:"min=0"`
// Connection age at which client retires (closes) the connection.
// Default is to not close aged connections.
MaxConnAge config.Duration `validate:"-"`
// Amount of time client waits for connection if all connections
// are busy before returning an error.
// Default is ReadTimeout + 1 second.
PoolTimeout config.Duration `validate:"-"`
// Amount of time after which client closes idle connections.
// Should be less than server's timeout.
// Default is 5 minutes. -1 disables idle timeout check.
IdleTimeout config.Duration `validate:"-"`
// Frequency of idle checks made by idle connections reaper.
// Default is 1 minute. -1 disables idle connections reaper,
// but idle connections are still discarded by the client
// if IdleTimeout is set.
// Deprecated: This field has been deprecated and will be removed.
IdleCheckFrequency config.Duration `validate:"-"`
}
// Ring is a wrapper around the go-redis/v9 Ring client that adds support for
// (optional) periodic SRV lookups.
type Ring struct {
*redis.Ring
lookup *lookup
}
// NewRingFromConfig returns a new *redis.Ring client. If periodic SRV lookups
// are supplied, a goroutine will be started to periodically perform lookups.
// Callers should defer a call to StopLookups() to ensure that this goroutine is
// gracefully shutdown.
func NewRingFromConfig(c Config, stats prometheus.Registerer, log blog.Logger) (*Ring, error) {
password, err := c.Pass()
if err != nil {
return nil, fmt.Errorf("loading password: %w", err)
}
tlsConfig, err := c.TLS.Load(stats)
if err != nil {
return nil, fmt.Errorf("loading TLS config: %w", err)
}
inner := redis.NewRing(&redis.RingOptions{
Addrs: c.ShardAddrs,
Username: c.Username,
Password: password,
TLSConfig: tlsConfig,
MaxRetries: c.MaxRetries,
MinRetryBackoff: c.MinRetryBackoff.Duration,
MaxRetryBackoff: c.MaxRetryBackoff.Duration,
DialTimeout: c.DialTimeout.Duration,
ReadTimeout: c.ReadTimeout.Duration,
WriteTimeout: c.WriteTimeout.Duration,
PoolSize: c.PoolSize,
MinIdleConns: c.MinIdleConns,
ConnMaxLifetime: c.MaxConnAge.Duration,
PoolTimeout: c.PoolTimeout.Duration,
ConnMaxIdleTime: c.IdleTimeout.Duration,
})
if len(c.ShardAddrs) > 0 {
// Client was statically configured with a list of shards.
MustRegisterClientMetricsCollector(inner, stats, c.ShardAddrs, c.Username)
}
var lookup *lookup
if len(c.Lookups) != 0 {
lookup, err = newLookup(c.Lookups, c.LookupDNSAuthority, c.LookupFrequency.Duration, inner, log, stats)
if err != nil {
return nil, err
}
lookup.start()
}
return &Ring{
Ring: inner,
lookup: lookup,
}, nil
}
// StopLookups stops the goroutine responsible for keeping the shards of the
// inner *redis.Ring up-to-date. It is a no-op if the Ring was not constructed
// with periodic lookups or if the lookups have already been stopped.
func (r *Ring) StopLookups() {
if r == nil || r.lookup == nil {
// No-op.
return
}
r.lookup.stop()
}

View File

@ -10,15 +10,16 @@ import (
"github.com/letsencrypt/boulder/cmd"
blog "github.com/letsencrypt/boulder/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
)
var ErrNoShardsResolved = errors.New("0 shards were resolved")
// Lookup wraps a Redis ring client by reference and keeps the Redis ring shards
// lookup wraps a Redis ring client by reference and keeps the Redis ring shards
// up to date via periodic SRV lookups.
type Lookup struct {
type lookup struct {
// srvLookups is a list of SRV records to be looked up.
srvLookups []cmd.ServiceDomain
@ -37,15 +38,20 @@ type Lookup struct {
// will be used for resolution.
dnsAuthority string
// stop is a context.CancelFunc that can be used to stop the goroutine
// responsible for performing periodic SRV lookups.
stop context.CancelFunc
resolver *net.Resolver
ring *redis.Ring
logger blog.Logger
stats prometheus.Registerer
}
// NewLookup constructs and returns a new Lookup instance. An initial SRV lookup
// newLookup constructs and returns a new lookup instance. An initial SRV lookup
// is performed to populate the Redis ring shards. If this lookup fails or
// otherwise results in an empty set of resolved shards, an error is returned.
func NewLookup(srvLookups []cmd.ServiceDomain, dnsAuthority string, frequency time.Duration, ring *redis.Ring, logger blog.Logger) (*Lookup, error) {
func newLookup(srvLookups []cmd.ServiceDomain, dnsAuthority string, frequency time.Duration, ring *redis.Ring, logger blog.Logger, stats prometheus.Registerer) (*lookup, error) {
updateFrequency := frequency
if updateFrequency <= 0 {
// Set default frequency.
@ -54,10 +60,11 @@ func NewLookup(srvLookups []cmd.ServiceDomain, dnsAuthority string, frequency ti
// Set default timeout to 90% of the update frequency.
updateTimeout := updateFrequency - updateFrequency/10
lookup := &Lookup{
lookup := &lookup{
srvLookups: srvLookups,
ring: ring,
logger: logger,
stats: stats,
updateFrequency: updateFrequency,
updateTimeout: updateTimeout,
dnsAuthority: dnsAuthority,
@ -108,7 +115,7 @@ func NewLookup(srvLookups []cmd.ServiceDomain, dnsAuthority string, frequency ti
// lookup succeeds, the Redis ring is updated, and all errors are discarded.
// Non-temporary DNS errors are always logged as they occur, as they're likely
// to be indicative of a misconfiguration.
func (look *Lookup) updateNow(ctx context.Context) (tempError, nonTempError error) {
func (look *lookup) updateNow(ctx context.Context) (tempError, nonTempError error) {
var tempErrs []error
handleDNSError := func(err error, srv cmd.ServiceDomain) {
var dnsErr *net.DNSError
@ -166,23 +173,28 @@ func (look *Lookup) updateNow(ctx context.Context) (tempError, nonTempError erro
// Some shards were resolved, update the Redis ring and discard all errors.
look.ring.SetAddrs(nextAddrs)
// Update the Redis client metrics.
MustRegisterClientMetricsCollector(look.ring, look.stats, nextAddrs, look.ring.Options().Username)
return nil, nil
}
// Start starts a goroutine that keeps the Redis ring shards up to date via
// periodic SRV lookups. The goroutine will exit when the provided context is
// cancelled.
func (look *Lookup) Start(ctx context.Context) {
// start starts a goroutine that keeps the Redis ring shards up-to-date by
// periodically performing SRV lookups.
func (look *lookup) start() {
var lookupCtx context.Context
lookupCtx, look.stop = context.WithCancel(context.Background())
go func() {
ticker := time.NewTicker(look.updateFrequency)
defer ticker.Stop()
for {
// Check for context cancellation before we do any work.
if ctx.Err() != nil {
if lookupCtx.Err() != nil {
return
}
timeoutCtx, cancel := context.WithTimeout(ctx, look.updateTimeout)
timeoutCtx, cancel := context.WithTimeout(lookupCtx, look.updateTimeout)
tempErrs, nonTempErrs := look.updateNow(timeoutCtx)
cancel()
if tempErrs != nil {
@ -198,7 +210,7 @@ func (look *Lookup) Start(ctx context.Context) {
case <-ticker.C:
continue
case <-ctx.Done():
case <-lookupCtx.Done():
return
}
}

View File

@ -41,7 +41,7 @@ func TestNewLookup(t *testing.T) {
logger := blog.NewMock()
ring := newTestRedisRing()
_, err := NewLookup([]cmd.ServiceDomain{
_, err := newLookup([]cmd.ServiceDomain{
{
Service: "redisratelimits",
Domain: "service.consul",
@ -51,8 +51,9 @@ func TestNewLookup(t *testing.T) {
250*time.Millisecond,
ring,
logger,
metrics.NoopRegisterer,
)
test.AssertNotError(t, err, "Expected NewLookup construction to succeed")
test.AssertNotError(t, err, "Expected newLookup construction to succeed")
}
func TestStart(t *testing.T) {
@ -61,7 +62,7 @@ func TestStart(t *testing.T) {
logger := blog.NewMock()
ring := newTestRedisRing()
lookup, err := NewLookup([]cmd.ServiceDomain{
lookup, err := newLookup([]cmd.ServiceDomain{
{
Service: "redisratelimits",
Domain: "service.consul",
@ -71,13 +72,12 @@ func TestStart(t *testing.T) {
250*time.Millisecond,
ring,
logger,
metrics.NoopRegisterer,
)
test.AssertNotError(t, err, "Expected NewLookup construction to succeed")
test.AssertNotError(t, err, "Expected newLookup construction to succeed")
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
lookup.Start(testCtx)
lookup.start()
lookup.stop()
}
func TestNewLookupWithOneFailingSRV(t *testing.T) {
@ -86,7 +86,7 @@ func TestNewLookupWithOneFailingSRV(t *testing.T) {
logger := blog.NewMock()
ring := newTestRedisRing()
_, err := NewLookup([]cmd.ServiceDomain{
_, err := newLookup([]cmd.ServiceDomain{
{
Service: "doesnotexist",
Domain: "service.consuls",
@ -100,8 +100,9 @@ func TestNewLookupWithOneFailingSRV(t *testing.T) {
250*time.Millisecond,
ring,
logger,
metrics.NoopRegisterer,
)
test.AssertNotError(t, err, "Expected NewLookup construction to succeed")
test.AssertNotError(t, err, "Expected newLookup construction to succeed")
}
func TestNewLookupWithAllFailingSRV(t *testing.T) {
@ -110,7 +111,7 @@ func TestNewLookupWithAllFailingSRV(t *testing.T) {
logger := blog.NewMock()
ring := newTestRedisRing()
_, err := NewLookup([]cmd.ServiceDomain{
_, err := newLookup([]cmd.ServiceDomain{
{
Service: "doesnotexist",
Domain: "service.consuls",
@ -124,8 +125,9 @@ func TestNewLookupWithAllFailingSRV(t *testing.T) {
250*time.Millisecond,
ring,
logger,
metrics.NoopRegisterer,
)
test.AssertError(t, err, "Expected NewLookup construction to fail")
test.AssertError(t, err, "Expected newLookup construction to fail")
}
func TestUpdateNowWithAllFailingSRV(t *testing.T) {
@ -134,7 +136,7 @@ func TestUpdateNowWithAllFailingSRV(t *testing.T) {
logger := blog.NewMock()
ring := newTestRedisRing()
lookup, err := NewLookup([]cmd.ServiceDomain{
lookup, err := newLookup([]cmd.ServiceDomain{
{
Service: "redisratelimits",
Domain: "service.consul",
@ -144,8 +146,9 @@ func TestUpdateNowWithAllFailingSRV(t *testing.T) {
250*time.Millisecond,
ring,
logger,
metrics.NoopRegisterer,
)
test.AssertNotError(t, err, "Expected NewLookup construction to succeed")
test.AssertNotError(t, err, "Expected newLookup construction to succeed")
lookup.srvLookups = []cmd.ServiceDomain{
{
@ -172,7 +175,7 @@ func TestUpdateNowWithAllFailingSRVs(t *testing.T) {
logger := blog.NewMock()
ring := newTestRedisRing()
lookup, err := NewLookup([]cmd.ServiceDomain{
lookup, err := newLookup([]cmd.ServiceDomain{
{
Service: "redisratelimits",
Domain: "service.consul",
@ -182,8 +185,9 @@ func TestUpdateNowWithAllFailingSRVs(t *testing.T) {
250*time.Millisecond,
ring,
logger,
metrics.NoopRegisterer,
)
test.AssertNotError(t, err, "Expected NewLookup construction to succeed")
test.AssertNotError(t, err, "Expected newLookup construction to succeed")
// Replace the dnsAuthority with a non-existent DNS server, this will cause
// a timeout error, which is technically a temporary error, but will
@ -204,7 +208,7 @@ func TestUpdateNowWithOneFailingSRV(t *testing.T) {
logger := blog.NewMock()
ring := newTestRedisRing()
lookup, err := NewLookup([]cmd.ServiceDomain{
lookup, err := newLookup([]cmd.ServiceDomain{
{
Service: "doesnotexist",
Domain: "service.consuls",
@ -218,8 +222,9 @@ func TestUpdateNowWithOneFailingSRV(t *testing.T) {
250*time.Millisecond,
ring,
logger,
metrics.NoopRegisterer,
)
test.AssertNotError(t, err, "Expected NewLookup construction to succeed")
test.AssertNotError(t, err, "Expected newLookup construction to succeed")
// The Consul service entry for 'redisratelimits' is configured to return
// two SRV targets. We should only have two shards in the ring.

View File

@ -1,6 +1,10 @@
package redis
import (
"errors"
"slices"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
)
@ -49,7 +53,8 @@ func (dbc metricsCollector) Collect(ch chan<- prometheus.Metric) {
writeGauge(dbc.staleConns, stats.StaleConns)
}
func NewMetricsCollector(statGetter poolStatGetter, labels prometheus.Labels) metricsCollector {
// newClientMetricsCollector is broken out for testing purposes.
func newClientMetricsCollector(statGetter poolStatGetter, labels prometheus.Labels) metricsCollector {
return metricsCollector{
statGetter: statGetter,
lookups: prometheus.NewDesc(
@ -70,3 +75,29 @@ func NewMetricsCollector(statGetter poolStatGetter, labels prometheus.Labels) me
nil, labels),
}
}
// MustRegisterClientMetricsCollector registers a metrics collector for the
// given Redis client with the provided prometheus.Registerer. The collector
// will report metrics labelled by the provided addresses and username. If the
// collector is already registered, this function is a no-op.
func MustRegisterClientMetricsCollector(client poolStatGetter, stats prometheus.Registerer, addrs map[string]string, user string) {
var labelAddrs []string
for addr := range addrs {
labelAddrs = append(labelAddrs, addr)
}
// Keep the list of addresses sorted for consistency.
slices.Sort(labelAddrs)
labels := prometheus.Labels{
"addresses": strings.Join(labelAddrs, ", "),
"user": user,
}
err := stats.Register(newClientMetricsCollector(client, labels))
if err != nil {
are := prometheus.AlreadyRegisteredError{}
if errors.As(err, &are) {
// The collector is already registered using the same labels.
return
}
panic(err)
}
}

View File

@ -25,7 +25,7 @@ func (mockPoolStatGetter) PoolStats() *redis.PoolStats {
}
func TestMetrics(t *testing.T) {
mets := NewMetricsCollector(mockPoolStatGetter{},
mets := newClientMetricsCollector(mockPoolStatGetter{},
prometheus.Labels{
"foo": "bar",
})
@ -62,3 +62,14 @@ Desc{fqName: "redis_connection_pool_stale_conns", help: "Number of stale connect
len(expected), len(results))
}
}
func TestMustRegisterClientMetricsCollector(t *testing.T) {
client := mockPoolStatGetter{}
stats := prometheus.NewRegistry()
// First registration should succeed.
MustRegisterClientMetricsCollector(client, stats, map[string]string{"foo": "bar"}, "baz")
// Duplicate registration should succeed.
MustRegisterClientMetricsCollector(client, stats, map[string]string{"foo": "bar"}, "baz")
// Registration with different label values should succeed.
MustRegisterClientMetricsCollector(client, stats, map[string]string{"f00": "b4r"}, "b4z")
}

View File

@ -16,10 +16,14 @@ import (
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/config"
"github.com/letsencrypt/boulder/issuance"
bredis "github.com/letsencrypt/boulder/redis"
"github.com/letsencrypt/boulder/rocsp"
)
// RedisConfig contains the configuration needed to act as a Redis client.
//
// TODO(#7081): Deprecate this in favor of bredis.Config once we can support SRV
// lookups in rocsp.
type RedisConfig struct {
// PasswordFile is a file containing the password for the Redis user.
cmd.PasswordConfig
@ -166,6 +170,7 @@ func MakeReadClient(c *RedisConfig, clk clock.Clock, stats prometheus.Registerer
PoolTimeout: c.PoolTimeout.Duration,
ConnMaxIdleTime: c.IdleTimeout.Duration,
})
bredis.MustRegisterClientMetricsCollector(rdb, stats, rdb.Options().Addrs, rdb.Options().Username)
return rocsp.NewReadingClient(rdb, c.Timeout.Duration, clk, stats), nil
}

View File

@ -4,11 +4,9 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/letsencrypt/boulder/core"
bredis "github.com/letsencrypt/boulder/redis"
"github.com/jmhodges/clock"
"github.com/prometheus/client_golang/prometheus"
@ -28,21 +26,8 @@ type ROClient struct {
// NewReadingClient creates a read-only client. The timeout applies to all
// requests, though a shorter timeout can be applied on a per-request basis
// using context.Context. rdb must be non-nil and calls to rdb.Options().Addrs
// must return at least one entry.
// using context.Context. rdb must be non-nil.
func NewReadingClient(rdb *redis.Ring, timeout time.Duration, clk clock.Clock, stats prometheus.Registerer) *ROClient {
if len(rdb.Options().Addrs) == 0 {
return nil
}
var addrs []string
for addr := range rdb.Options().Addrs {
addrs = append(addrs, addr)
}
labels := prometheus.Labels{
"addresses": strings.Join(addrs, ", "),
"user": rdb.Options().Username,
}
stats.MustRegister(bredis.NewMetricsCollector(rdb, labels))
getLatency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "rocsp_get_latency",

View File

@ -0,0 +1,2 @@
NewRegistrationsPerIPAddress: { burst: 10000, count: 10000, period: 168h }
NewRegistrationsPerIPv6Range: { burst: 99999, count: 99999, period: 168h }

View File

@ -0,0 +1 @@
NewRegistrationsPerIPAddress:127.0.0.1: { burst: 1000000, count: 1000000, period: 168h }

View File

@ -105,6 +105,30 @@
"staleTimeout": "5m",
"authorizationLifetimeDays": 30,
"pendingAuthorizationLifetimeDays": 7,
"limiter": {
"redis": {
"username": "boulder-wfe",
"passwordFile": "test/secrets/wfe_ratelimits_redis_password",
"lookups": [
{
"Service": "redisratelimits",
"Domain": "service.consul"
}
],
"lookupDNSAuthority": "consul.service.consul",
"readTimeout": "250ms",
"writeTimeout": "250ms",
"poolSize": 100,
"routeRandomly": true,
"tls": {
"caCertFile": "test/redis-tls/minica.pem",
"certFile": "test/redis-tls/boulder/cert.pem",
"keyFile": "test/redis-tls/boulder/key.pem"
}
},
"Defaults": "test/config-next/wfe2-ratelimit-defaults.yml",
"Overrides": "test/config-next/wfe2-ratelimit-overrides.yml"
},
"features": {
"ServeRenewalInfo": true,
"RequireCommonName": false

View File

@ -0,0 +1 @@
824968fa490f4ecec1e52d5e34916bdb60d45f8d

View File

@ -0,0 +1 @@
b3b2fcbbf46fe39fd522c395a51f84d93a98ff2f

View File

@ -29,6 +29,7 @@ import (
"github.com/letsencrypt/boulder/features"
"github.com/letsencrypt/boulder/goodkey"
bgrpc "github.com/letsencrypt/boulder/grpc"
"github.com/letsencrypt/boulder/ratelimits"
// 'grpc/noncebalancer' is imported for its init function.
_ "github.com/letsencrypt/boulder/grpc/noncebalancer"
@ -166,6 +167,7 @@ type WebFrontEndImpl struct {
// match the ones used by the RA.
authorizationLifetime time.Duration
pendingAuthorizationLifetime time.Duration
limiter *ratelimits.Limiter
}
// NewWebFrontEndImpl constructs a web service for Boulder
@ -187,6 +189,7 @@ func NewWebFrontEndImpl(
rnc nonce.Redeemer,
rncKey string,
accountGetter AccountGetter,
limiter *ratelimits.Limiter,
) (WebFrontEndImpl, error) {
if len(issuerCertificates) == 0 {
return WebFrontEndImpl{}, errors.New("must provide at least one issuer certificate")
@ -223,6 +226,7 @@ func NewWebFrontEndImpl(
rnc: rnc,
rncKey: rncKey,
accountGetter: accountGetter,
limiter: limiter,
}
return wfe, nil
@ -611,6 +615,89 @@ func link(url, relation string) string {
return fmt.Sprintf("<%s>;rel=\"%s\"", url, relation)
}
// checkNewAccountLimits checks whether sufficient limit quota exists for the
// creation of a new account from the given IP address. If so, that quota is
// spent. If an error is encountered during the check, it is logged but not
// returned.
//
// TODO(#5545): For now we're simply exercising the new rate limiter codepath.
// This should eventually return a berrors.RateLimit error containing the retry
// after duration among other information available in the ratelimits.Decision.
func (wfe *WebFrontEndImpl) checkNewAccountLimits(ctx context.Context, ip net.IP) {
if wfe.limiter == nil {
// Limiter is disabled.
return
}
warn := func(err error, limit ratelimits.Name) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
// TODO(#5545): Once key-value rate limits are authoritative this log
// line should be removed in favor of returning the error.
wfe.log.Warningf("checking %s rate limit: %s", limit, err)
}
decision, err := wfe.limiter.Spend(ctx, ratelimits.NewRegistrationsPerIPAddress, ip.String(), 1)
if err != nil {
warn(err, ratelimits.NewRegistrationsPerIPAddress)
return
}
if !decision.Allowed || ip.To4() != nil {
// This requester is being limited or the request was made from an IPv4
// address.
return
}
// See docs for ratelimits.NewRegistrationsPerIPv6Range for more information
// on the selection of a /48 block size for IPv6 ranges.
ipMask := net.CIDRMask(48, 128)
ipNet := &net.IPNet{IP: ip.Mask(ipMask), Mask: ipMask}
_, err = wfe.limiter.Spend(ctx, ratelimits.NewRegistrationsPerIPv6Range, ipNet.String(), 1)
if err != nil {
warn(err, ratelimits.NewRegistrationsPerIPv6Range)
}
}
// refundNewAccountLimits is typically called when a new account creation fails.
// It refunds the limit quota consumed by the request, allowing the caller to
// retry immediately. If an error is encountered during the refund, it is logged
// but not returned.
func (wfe *WebFrontEndImpl) refundNewAccountLimits(ctx context.Context, ip net.IP) {
if wfe.limiter == nil {
// Limiter is disabled.
return
}
warn := func(err error, limit ratelimits.Name) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
// TODO(#5545): Once key-value rate limits are authoritative this log
// line should be removed in favor of returning the error.
wfe.log.Warningf("refunding %s rate limit: %s", limit, err)
}
_, err := wfe.limiter.Refund(ctx, ratelimits.NewRegistrationsPerIPAddress, ip.String(), 1)
if err != nil {
warn(err, ratelimits.NewRegistrationsPerIPAddress)
return
}
if ip.To4() != nil {
// Request was made from an IPv4 address.
return
}
// See docs for ratelimits.NewRegistrationsPerIPv6Range for more information
// on the selection of a /48 block size for IPv6 ranges.
ipMask := net.CIDRMask(48, 128)
ipNet := &net.IPNet{IP: ip.Mask(ipMask), Mask: ipMask}
_, err = wfe.limiter.Refund(ctx, ratelimits.NewRegistrationsPerIPv6Range, ipNet.String(), 1)
if err != nil {
warn(err, ratelimits.NewRegistrationsPerIPv6Range)
}
}
// NewAccount is used by clients to submit a new account
func (wfe *WebFrontEndImpl) NewAccount(
ctx context.Context,
@ -733,6 +820,16 @@ func (wfe *WebFrontEndImpl) NewAccount(
InitialIP: ipBytes,
}
// TODO(#5545): Spending and Refunding can be async until these rate limits
// are authoritative. This saves us from adding latency to each request.
go wfe.checkNewAccountLimits(ctx, ip)
var newRegistrationSuccessful bool
defer func() {
if !newRegistrationSuccessful {
go wfe.refundNewAccountLimits(ctx, ip)
}
}()
// Send the registration to the RA via grpc
acctPB, err := wfe.ra.NewRegistration(ctx, &reg)
if err != nil {
@ -786,6 +883,7 @@ func (wfe *WebFrontEndImpl) NewAccount(
wfe.sendError(response, logEvent, probs.ServerInternal("Error marshaling account"), err)
return
}
newRegistrationSuccessful = true
}
// parseRevocation accepts the payload for a revocation request and parses it

View File

@ -36,6 +36,7 @@ import (
"gopkg.in/go-jose/go-jose.v2"
capb "github.com/letsencrypt/boulder/ca/proto"
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/core"
corepb "github.com/letsencrypt/boulder/core/proto"
berrors "github.com/letsencrypt/boulder/errors"
@ -52,6 +53,8 @@ import (
noncepb "github.com/letsencrypt/boulder/nonce/proto"
"github.com/letsencrypt/boulder/probs"
rapb "github.com/letsencrypt/boulder/ra/proto"
"github.com/letsencrypt/boulder/ratelimits"
bredis "github.com/letsencrypt/boulder/redis"
"github.com/letsencrypt/boulder/revocation"
sapb "github.com/letsencrypt/boulder/sa/proto"
"github.com/letsencrypt/boulder/test"
@ -340,11 +343,14 @@ func setupWFE(t *testing.T) (WebFrontEndImpl, clock.FakeClock, requestSigner) {
mockSA := mocks.NewStorageAuthorityReadOnly(fc)
log := blog.NewMock()
var gnc nonce.Getter
var noncePrefixMap map[string]nonce.Redeemer
var rnc nonce.Redeemer
var rncKey string
var inmemNonceService *inmemnonce.Service
var limiter *ratelimits.Limiter
if strings.Contains(os.Getenv("BOULDER_CONFIG_DIR"), "test/config-next") {
// Use derived nonces.
noncePrefix := nonce.DerivePrefix("192.168.1.1:8080", "b8c758dd85e113ea340ce0b3a99f389d40a308548af94d1730a7692c1874f1f")
@ -354,6 +360,32 @@ func setupWFE(t *testing.T) (WebFrontEndImpl, clock.FakeClock, requestSigner) {
inmemNonceService = &inmemnonce.Service{NonceService: nonceService}
gnc = inmemNonceService
rnc = inmemNonceService
// Setup rate limiting.
rc := bredis.Config{
Username: "unittest-rw",
TLS: cmd.TLSConfig{
CACertFile: "../test/redis-tls/minica.pem",
CertFile: "../test/redis-tls/boulder/cert.pem",
KeyFile: "../test/redis-tls/boulder/key.pem",
},
Lookups: []cmd.ServiceDomain{
{
Service: "redisratelimits",
Domain: "service.consul",
},
},
LookupDNSAuthority: "consul.service.consul",
}
rc.PasswordConfig = cmd.PasswordConfig{
PasswordFile: "../test/secrets/ratelimits_redis_password",
}
ring, err := bredis.NewRingFromConfig(rc, stats, log)
test.AssertNotError(t, err, "making redis ring client")
source := ratelimits.NewRedisSource(ring.Ring, fc, stats)
test.AssertNotNil(t, source, "source should not be nil")
limiter, err = ratelimits.NewLimiter(fc, source, "../test/config-next/wfe2-ratelimit-defaults.yml", "", stats)
test.AssertNotError(t, err, "making limiter")
} else {
// TODO(#6610): Remove this once we've moved to derived to prefixes.
noncePrefix := "mlem"
@ -383,7 +415,8 @@ func setupWFE(t *testing.T) (WebFrontEndImpl, clock.FakeClock, requestSigner) {
noncePrefixMap,
rnc,
rncKey,
mockSA)
mockSA,
limiter)
test.AssertNotError(t, err, "Unable to create WFE")
wfe.SubscriberAgreementURL = agreementURL