redis: Add support for *redis.Ring shard configuration using SRV records (#7042)

Part of #5545
This commit is contained in:
Samantha 2023-09-11 15:05:55 -04:00 committed by GitHub
parent aae7fb3551
commit 7068db96fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 486 additions and 1 deletions

View File

@ -230,8 +230,13 @@ type SyslogConfig struct {
// ServiceDomain contains the service and domain name the gRPC or bdns provider
// will use to construct a SRV DNS query to lookup backends.
type ServiceDomain struct {
// Service is the service name to be used for SRV lookups. For example: if
// record is 'foo.service.consul', then the Service is 'foo'.
Service string `validate:"required"`
Domain string `validate:"required"`
// Domain is the domain name to be used for SRV lookups. For example: if the
// record is 'foo.service.consul', then the Domain is 'service.consul'.
Domain string `validate:"required"`
}
// GRPCClientConfig contains the information necessary to setup a gRPC client

216
redis/lookup.go Normal file
View File

@ -0,0 +1,216 @@
package redis
import (
"context"
"errors"
"fmt"
"net"
"strings"
"time"
"github.com/letsencrypt/boulder/cmd"
blog "github.com/letsencrypt/boulder/log"
"github.com/redis/go-redis/v9"
)
var ErrNoShardsResolved = errors.New("0 shards were resolved")
// Lookup wraps a Redis ring client by reference and keeps the Redis ring shards
// up to date via periodic SRV lookups.
type Lookup struct {
// srvLookups is a list of SRV records to be looked up.
srvLookups []cmd.ServiceDomain
// updateFrequency is the frequency of periodic SRV lookups. Defaults to 30
// seconds.
updateFrequency time.Duration
// updateTimeout is the timeout for each SRV lookup. Defaults to 90% of the
// update frequency.
updateTimeout time.Duration
// dnsAuthority is the single <hostname|IPv4|[IPv6]>:<port> of the DNS
// server to be used for SRV lookups. If the address contains a hostname it
// will be resolved via the system DNS. If the port is left unspecified it
// will default to '53'. If this field is left unspecified the system DNS
// will be used for resolution.
dnsAuthority string
resolver *net.Resolver
ring *redis.Ring
logger blog.Logger
}
// NewLookup constructs and returns a new Lookup instance. An initial SRV lookup
// is performed to populate the Redis ring shards. If this lookup fails or
// otherwise results in an empty set of resolved shards, an error is returned.
func NewLookup(srvLookups []cmd.ServiceDomain, dnsAuthority string, frequency time.Duration, ring *redis.Ring, logger blog.Logger) (*Lookup, error) {
updateFrequency := frequency
if updateFrequency <= 0 {
// Set default frequency.
updateFrequency = 30 * time.Second
}
// Set default timeout to 90% of the update frequency.
updateTimeout := updateFrequency - updateFrequency/10
var lookup *Lookup
if dnsAuthority == "" {
// Use the system DNS resolver.
lookup = &Lookup{
srvLookups: srvLookups,
ring: ring,
logger: logger,
updateFrequency: updateFrequency,
updateTimeout: updateTimeout,
resolver: net.DefaultResolver,
dnsAuthority: dnsAuthority,
}
} else {
// Setup a custom DNS resolver.
lookup = &Lookup{
srvLookups: srvLookups,
ring: ring,
logger: logger,
updateFrequency: updateFrequency,
updateTimeout: updateTimeout,
dnsAuthority: dnsAuthority,
}
host, port, err := net.SplitHostPort(dnsAuthority)
if err != nil {
// Assume only hostname or IPv4 address was specified.
host = dnsAuthority
port = "53"
}
lookup.dnsAuthority = net.JoinHostPort(host, port)
lookup.resolver = &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
// The custom resolver closes over the lookup.dnsAuthority field
// so it can be swapped out in testing.
return net.Dial(network, lookup.dnsAuthority)
},
}
}
ctx, cancel := context.WithTimeout(context.Background(), updateTimeout)
defer cancel()
tempErr, nonTempErr := lookup.updateNow(ctx)
if tempErr != nil {
// Log and discard temporary errors, as they're likely to be transient
// (e.g. network connectivity issues).
logger.Warningf("resolving ring shards: %s", tempErr)
}
if nonTempErr != nil && errors.Is(nonTempErr, ErrNoShardsResolved) {
// Non-temporary errors are always logged inside of updateNow(), so we
// only need return the error here if it's ErrNoShardsResolved.
return nil, nonTempErr
}
return lookup, nil
}
// updateNow resolves and updates the Redis ring shards accordingly. If all
// lookups fail or otherwise result in an empty set of resolved shards, the
// Redis ring is left unmodified and any errors are returned. If at least one
// lookup succeeds, the Redis ring is updated, and all errors are discarded.
// Non-temporary DNS errors are always logged as they occur, as they're likely
// to be indicative of a misconfiguration.
func (look *Lookup) updateNow(ctx context.Context) (tempError, nonTempError error) {
var tempErrs []error
handleDNSError := func(err error, srv cmd.ServiceDomain) {
var dnsErr *net.DNSError
if errors.As(err, &dnsErr) && (dnsErr.IsTimeout || dnsErr.IsTemporary) {
tempErrs = append(tempErrs, err)
return
}
// Log non-temporary DNS errors as they occur, as they're likely to be
// indicative of misconfiguration.
look.logger.Errf("resolving service _%s._tcp.%s: %s", srv.Service, srv.Domain, err)
}
nextAddrs := make(map[string]string)
for _, srv := range look.srvLookups {
_, targets, err := look.resolver.LookupSRV(ctx, srv.Service, "tcp", srv.Domain)
if err != nil {
handleDNSError(err, srv)
// Skip to the next SRV lookup.
continue
}
if len(targets) <= 0 {
tempErrs = append(tempErrs, fmt.Errorf("0 targets resolved for service \"_%s._tcp.%s\"", srv.Service, srv.Domain))
// Skip to the next SRV lookup.
continue
}
for _, target := range targets {
host := strings.TrimRight(target.Target, ".")
if look.dnsAuthority != "" {
// Lookup A/AAAA records for the SRV target using the custom DNS
// authority.
hostAddrs, err := look.resolver.LookupHost(ctx, host)
if err != nil {
handleDNSError(err, srv)
// Skip to the next A/AAAA lookup.
continue
}
if len(hostAddrs) <= 0 {
tempErrs = append(tempErrs, fmt.Errorf("0 addrs resolved for target %q of service \"_%s._tcp.%s\"", host, srv.Service, srv.Domain))
// Skip to the next A/AAAA lookup.
continue
}
// Use the first resolved IP address.
host = hostAddrs[0]
}
addr := fmt.Sprintf("%s:%d", host, target.Port)
nextAddrs[addr] = addr
}
}
// Only return errors if we failed to resolve any shards.
if len(nextAddrs) <= 0 {
return errors.Join(tempErrs...), ErrNoShardsResolved
}
// Some shards were resolved, update the Redis ring and discard all errors.
look.ring.SetAddrs(nextAddrs)
return nil, nil
}
// Start starts a goroutine that keeps the Redis ring shards up to date via
// periodic SRV lookups. The goroutine will exit when the provided context is
// cancelled.
func (look *Lookup) Start(ctx context.Context) {
go func() {
ticker := time.NewTicker(look.updateFrequency)
defer ticker.Stop()
for {
// Check for context cancellation before we do any work.
if ctx.Err() != nil {
return
}
timeoutCtx, cancel := context.WithTimeout(ctx, look.updateTimeout)
tempErrs, nonTempErrs := look.updateNow(timeoutCtx)
cancel()
if tempErrs != nil {
look.logger.Warningf("resolving ring shards, temporary errors: %s", tempErrs)
continue
}
if nonTempErrs != nil {
look.logger.Errf("resolving ring shards, non-temporary errors: %s", nonTempErrs)
continue
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
return
}
}
}()
}

248
redis/lookup_test.go Normal file
View File

@ -0,0 +1,248 @@
package redis
import (
"context"
"testing"
"time"
"github.com/letsencrypt/boulder/cmd"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/test"
"github.com/redis/go-redis/v9"
)
func newTestRedisRing() *redis.Ring {
CACertFile := "../test/redis-tls/minica.pem"
CertFile := "../test/redis-tls/boulder/cert.pem"
KeyFile := "../test/redis-tls/boulder/key.pem"
tlsConfig := cmd.TLSConfig{
CACertFile: CACertFile,
CertFile: CertFile,
KeyFile: KeyFile,
}
tlsConfig2, err := tlsConfig.Load(metrics.NoopRegisterer)
if err != nil {
panic(err)
}
client := redis.NewRing(&redis.RingOptions{
Username: "unittest-rw",
Password: "824968fa490f4ecec1e52d5e34916bdb60d45f8d",
TLSConfig: tlsConfig2,
})
return client
}
func TestNewLookup(t *testing.T) {
t.Parallel()
logger := blog.NewMock()
ring := newTestRedisRing()
_, err := NewLookup([]cmd.ServiceDomain{
{
Service: "redisratelimits",
Domain: "service.consul",
},
},
"consul.service.consul",
250*time.Millisecond,
ring,
logger,
)
test.AssertNotError(t, err, "Expected NewLookup construction to succeed")
}
func TestStart(t *testing.T) {
t.Parallel()
logger := blog.NewMock()
ring := newTestRedisRing()
lookup, err := NewLookup([]cmd.ServiceDomain{
{
Service: "redisratelimits",
Domain: "service.consul",
},
},
"consul.service.consul",
250*time.Millisecond,
ring,
logger,
)
test.AssertNotError(t, err, "Expected NewLookup construction to succeed")
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
lookup.Start(testCtx)
}
func TestNewLookupWithOneFailingSRV(t *testing.T) {
t.Parallel()
logger := blog.NewMock()
ring := newTestRedisRing()
_, err := NewLookup([]cmd.ServiceDomain{
{
Service: "doesnotexist",
Domain: "service.consuls",
},
{
Service: "redisratelimits",
Domain: "service.consul",
},
},
"consul.service.consul",
250*time.Millisecond,
ring,
logger,
)
test.AssertNotError(t, err, "Expected NewLookup construction to succeed")
}
func TestNewLookupWithAllFailingSRV(t *testing.T) {
t.Parallel()
logger := blog.NewMock()
ring := newTestRedisRing()
_, err := NewLookup([]cmd.ServiceDomain{
{
Service: "doesnotexist",
Domain: "service.consuls",
},
{
Service: "doesnotexist2",
Domain: "service.consuls",
},
},
"consul.service.consul",
250*time.Millisecond,
ring,
logger,
)
test.AssertError(t, err, "Expected NewLookup construction to fail")
}
func TestUpdateNowWithAllFailingSRV(t *testing.T) {
t.Parallel()
logger := blog.NewMock()
ring := newTestRedisRing()
lookup, err := NewLookup([]cmd.ServiceDomain{
{
Service: "redisratelimits",
Domain: "service.consul",
},
},
"consul.service.consul",
250*time.Millisecond,
ring,
logger,
)
test.AssertNotError(t, err, "Expected NewLookup construction to succeed")
lookup.srvLookups = []cmd.ServiceDomain{
{
Service: "doesnotexist1",
Domain: "service.consul",
},
{
Service: "doesnotexist2",
Domain: "service.consul",
},
}
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
tempErr, nonTempErr := lookup.updateNow(testCtx)
test.AssertNotError(t, tempErr, "Expected no temporary errors")
test.AssertError(t, nonTempErr, "Expected non-temporary errors to have occurred")
}
func TestUpdateNowWithAllFailingSRVs(t *testing.T) {
t.Parallel()
logger := blog.NewMock()
ring := newTestRedisRing()
lookup, err := NewLookup([]cmd.ServiceDomain{
{
Service: "redisratelimits",
Domain: "service.consul",
},
},
"consul.service.consul",
250*time.Millisecond,
ring,
logger,
)
test.AssertNotError(t, err, "Expected NewLookup construction to succeed")
// Replace the dnsAuthority with a non-existent DNS server, this will cause
// a timeout error, which is technically a temporary error, but will
// eventually result in a non-temporary error when no shards are resolved.
lookup.dnsAuthority = "consuls.services.consuls:53"
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
tempErr, nonTempErr := lookup.updateNow(testCtx)
test.AssertError(t, tempErr, "Expected temporary errors")
test.AssertError(t, nonTempErr, "Expected a non-temporary error")
test.AssertErrorIs(t, nonTempErr, ErrNoShardsResolved)
}
func TestUpdateNowWithOneFailingSRV(t *testing.T) {
t.Parallel()
logger := blog.NewMock()
ring := newTestRedisRing()
lookup, err := NewLookup([]cmd.ServiceDomain{
{
Service: "doesnotexist",
Domain: "service.consuls",
},
{
Service: "redisratelimits",
Domain: "service.consul",
},
},
"consul.service.consul",
250*time.Millisecond,
ring,
logger,
)
test.AssertNotError(t, err, "Expected NewLookup construction to succeed")
// The Consul service entry for 'redisratelimits' is configured to return
// two SRV targets. We should only have two shards in the ring.
test.Assert(t, ring.Len() == 2, "Expected 2 shards in the ring")
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
// Ensure we can reach both shards using the PING command.
err = ring.ForEachShard(testCtx, func(ctx context.Context, shard *redis.Client) error {
return shard.Ping(ctx).Err()
})
test.AssertNotError(t, err, "Expected PING to succeed for both shards")
// Drop both Shards from the ring.
ring.SetAddrs(map[string]string{})
test.Assert(t, ring.Len() == 0, "Expected 0 shards in the ring")
// Force a lookup to occur.
tempErr, nonTempErr := lookup.updateNow(testCtx)
test.AssertNotError(t, tempErr, "Expected no temporary errors")
test.AssertNotError(t, nonTempErr, "Expected no non-temporary errors")
// The ring should now have two shards again.
test.Assert(t, ring.Len() == 2, "Expected 2 shards in the ring")
}

View File

@ -349,6 +349,22 @@ services {
tags = ["tcp"] // Required for SRV RR support in gRPC DNS resolution.
}
services {
id = "bredis3"
name = "redisratelimits"
address = "10.33.33.4"
port = 4218
tags = ["tcp"] // Required for SRV RR support in DNS resolution.
}
services {
id = "bredis4"
name = "redisratelimits"
address = "10.33.33.5"
port = 4218
tags = ["tcp"] // Required for SRV RR support in DNS resolution.
}
//
// The following services are used for testing the gRPC DNS resolver.
//